当前位置: 首页 > news >正文

多线程-线程池

简介

线程池:池化技术的一种,用于存储线程,实现线程的复用。线程池机制避免了频繁的创建和销毁线程,可以提升程序的性能

线程池的使用

核心API:ThreadPoolExecutor

ThreadPoolExecutor是线程池的核心类,提供了线程池的核心功能

构造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

构建线程池的7个核心参数:

  • 核心线程数:corePoolSize,线程池中保持的线程数量,即使在空闲状态下。
  • 最大线程数:maximumPoolSize,线程池中最多能创建多少个线程
  • 线程存活时间:keepAliveTime,当线程池中的线程数大于核心线程数时,如果一个线程空闲的时间达到最大存活时间,则会终止,直到线程池中的线程数不超过核心线程数
  • 线程存活时间的单位:unit,参数keepAliveTime的时间单位,有7种取值,是java.util.concurrent.TimeUnit类中枚举的7个常量值
  • 阻塞队列:workQueue,用来存储等待执行的任务
  • 线程工厂:threadFactory,主要用来创建线程,设置线程的名称等特性
  • 拒绝策略:handler,无法处理任务时的策略

线程池的工作机制

  • 当线程数目小于核心线程数时:每来一个任务,就会创建一个线程去执行这个任务
  • 当线程数目等于核心线程数但是阻塞队列没有满时:把提交的任务添加到阻塞队列
  • 当阻塞队列已满但是没有到最大线程数时:创建新的线程执行任务,直到最大线程数。
  • 当阻塞队列已满并且达到最大线程数时:执行拒绝策略,此时线程池已经无法执行任务
  • 当线程空闲时:如果线程池中的线程数量大于核心线程数时,当某线程空闲时间超过最大存活时间,线程将被终止,直至线程池中的线程数目不大于核心线程数

常见的阻塞队列

  • 基于数组的有界阻塞队列:ArrayBlockingQueue,队列按先进先出原则对元素进行排序,创建时需要指定队列长度
  • 基于链表的无界阻塞队列:LinkedBlockingQueue,同样是先进先出,因为队列无界,所以核心线程数满了之后,不会再创建线程了。使用无界队列需要预防任务堆积导致的OOM
  • 基于优先级的无界阻塞队列:PriorityBlockingQueue,用户可以指定元素的排序规则
  • 不存储元素的阻塞队列:SynchronousQueue,当线程池不需阻塞存队列时,可以使用它。向队列中新增元素和从队列中取元素两个操作是匹配的,插入元素时会检查是否有等待取元素的线程,如果没有,会阻塞,直到有线程来取元素。取元素时也是一样,会先检查有没有新增元素的线程,如果有,取元素,否则阻塞。

常见的拒绝策略

拒绝策略是指当线程池无法执行任务时该怎么办,线程池提供的拒绝策略:

  • 丢弃任务并抛异常:这是默认的拒绝策略,ThreadPoolExecutor.AbortPolicy
  • 丢弃任务但是不抛异常:ThreadPoolExecutor.DiscardPolicy
  • 丢弃队列最前面的任务,然后重新尝试执行任务,重复此过程:ThreadPoolExecutor.DiscardOldestPolicy
  • 由提交任务的线程来执行任务:ThreadPoolExecutor.CallerRunsPolicy

通过几个实例案例来学习线程池的api

案例1:向线程池中提交任务

使用execute方法、submit方法,向线程池中提交任务

public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建线程池,核心线程数2,最大线程数10,存活时间10s,使用数组作为阻塞队列,队列长度是5ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 10, 200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));// 提交不需要返回值的任务pool.submit(() -> Utils.println("正在执行 task1"));pool.execute(() -> Utils.println("正在执行 task2"));// 提交需要返回值的任务Future<String> future = pool.submit(() -> "success");System.out.println("future.get() = " + future.get());// 等待所有任务结束后,关闭线程池pool.shutdown();
}

execute和submit方法的区别:

  • execute方法:执行没有返回值的任务。public void execute(Runnable command)
  • submit:可以执行没有返回值的任务,也可以执行有返回值的任务
    • Future<?> submit(Runnable task)
    • <T> Future<T> submit(Callable<T> task)
    • <T> Future<T> submit(Runnable task, T result)
案例2:当阻塞队列是有限阻塞队列时,观察线程池的运行

模拟:一共20个异步任务,每个任务执行耗时1秒,线程池的核心线程池数是2、最大线程数是5、阻塞队列的长度是15。开启一个新的线程,向线程池中提交任务,同时再开启一个线程,打印线程池的状态。根据我们的设置,线程池可以处理完这些任务,观察线程池在处理过程中各个参数的变化。

public static void main(String[] args) {// 创建线程池int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 10;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue);// 在新线程中向线程池提交任务new Thread(() -> {for (int i = 1; i <= 20; i++) {threadPool.execute(new Runnable() {// 每个任务执行耗时一秒@Overridepublic void run() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}}).start();// 观察线程池的运行状态new Thread(() -> {for (int i = 0; i < 1000; i++) {System.out.println("[" + System.currentTimeMillis() + "]"+ ",核心线程数(参数)" + threadPool.getCorePoolSize()+ ",最大线程数(参数)" + threadPool.getMaximumPoolSize()+ ",当前线程数 = " + threadPool.getPoolSize()+ ",最大线程数(运行时) = " + threadPool.getLargestPoolSize()+ ",正在执行任务的线程数 = " + threadPool.getActiveCount()+ ",已完成任务数 = " + threadPool.getCompletedTaskCount()+ ",任务总数 = " + threadPool.getTaskCount()+ ",缓存队列中的任务数 = " + threadPool.getQueue().size()+ ",线程存活时间 = " + threadPool.getKeepAliveTime(TimeUnit.SECONDS) + "秒");try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();
}

总结:观察线程池的运行情况,基本符合之前提到的线程池的工作机制。

案例3:当阻塞队列是无界阻塞队列时,观察线程池的运行

模拟:一共20个异步任务,每个任务执行耗时1秒,线程池的核心线程池数是2、最大线程数是5、阻塞队列是无界阻塞队列

public static void main(String[] args) {// 创建线程池int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 10;BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue);// 在新线程中向线程池提交任务new Thread(() -> {for (int i = 1; i <= 20; i++) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}}).start();// 观察线程池的运行状态new Thread(() -> {for (int i = 0; i < 1000; i++) {System.out.println("[" + System.currentTimeMillis() + "]"+ ",核心线程数(参数)" + threadPool.getCorePoolSize()+ ",最大线程数(参数)" + threadPool.getMaximumPoolSize()+ ",当前线程数 = " + threadPool.getPoolSize()+ ",最大线程数(运行时) = " + threadPool.getLargestPoolSize()+ ",正在执行任务的线程数 = " + threadPool.getActiveCount()+ ",已完成任务数 = " + threadPool.getCompletedTaskCount()+ ",任务总数 = " + threadPool.getTaskCount()+ ",缓存队列中的任务数 = " + threadPool.getQueue().size()+ ",线程存活时间 = " + threadPool.getKeepAliveTime(TimeUnit.SECONDS) + "秒");try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}

总结:观察日志中线程池的运行情况,基本可以印证之前提到的线程池的工作机制,无界阻塞队列,只要线程数等于核心线程数了,就不会再创建新线程了。

案例4:关闭线程池shutdown方法
public static void main(String[] args) {// 创建线程池int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 10;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue);// 在新线程中向线程池提交任务new Thread(() -> {for (int i = 1; i <= 20; i++) {threadPool.execute(new Runnable() {@Overridepublic void run() {for (long i = 0; i < 9000000000L; i++) {}}});}}).start();// 观察线程池的运行状态new Thread(() -> {for (int i = 0; i < 500; i++) {System.out.println("[" + System.currentTimeMillis() + "]"+ ",线程池是否关闭 = " + threadPool.isShutdown()+ ",当前线程数 = " + threadPool.getPoolSize()+ ",正在执行任务的线程数 = " + threadPool.getActiveCount()+ ",已完成任务数 = " + threadPool.getCompletedTaskCount()+ ",任务总数 = " + threadPool.getTaskCount()+ ",缓存队列中的任务数 = " + threadPool.getQueue().size());try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("关闭线程池");threadPool.shutdown(); // 关闭线程池后,依旧会执行完剩下的任务
}

总结:调用shutdown方法关闭线程池后,依旧会执行完剩下的任务

案例5:关闭线程池 shutdownNow方法
public static void main(String[] args) {// 创建线程池int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 10;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(15);ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue);// 在新线程中向线程池提交任务new Thread(() -> {for (int i = 1; i <= 20; i++) {threadPool.execute(new Runnable() {@Overridepublic void run() {for (long i = 0; i < 9000000000L; i++) {}}});}}).start();// 观察线程池的运行状态new Thread(() -> {for (int i = 0; i < 100; i++) {System.out.println("[" + System.currentTimeMillis() + "]"+ ",线程池是否关闭 = " + threadPool.isShutdown()+ ",当前线程数 = " + threadPool.getPoolSize()+ ",最大线程数(运行时) = " + threadPool.getLargestPoolSize()+ ",正在执行任务的线程数 = " + threadPool.getActiveCount()+ ",已完成任务数 = " + threadPool.getCompletedTaskCount()+ ",任务总数 = " + threadPool.getTaskCount()+ ",缓存队列中的任务数 = " + threadPool.getQueue().size()+ ",线程存活时间 = " + threadPool.getKeepAliveTime(TimeUnit.SECONDS) + "秒");try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("关闭线程池");// 立刻关闭线程池,同时返回阻塞队列中的任务,正在执行任务的线程会被打断,// 如果线程处于休眠状态下,会抛异常,如果线程正在运行,会执行完当前的任务。List<Runnable> runnables = threadPool.shutdownNow();
}

总结:调用shutdownNow方法后,立刻关闭线程池,同时返回阻塞队列中的任务,正在执行任务的线程会被打断,如果线程处于休眠状态下,会抛异常,如果线程正在运行,会执行完当前的任务。

使用经验

如何选择阻塞队列

使用无界阻塞队列要注意内存溢出问题,因为很有可能阻塞了太多任务导致OOM

应该如何选择拒绝策略

当线程池被打满,无法执行新提交的任务时,应该选择何种拒绝策略?可以依据不同的场景,选择不同的拒绝策略。

  • 如果用户需要立刻知道任务失败,选择抛异常,这是默认的拒绝策略
  • 如果用户可以接受延迟,选择使用提交任务的线程来执行任务
  • 非关键性的任务,可以选择丢弃任务并且不抛异常,例如日志系统的收集
  • 如果是实时系统,可以选择丢弃最前面的任务,因为当前的任务价值最高

自定义拒绝策略:可以选择抛异常,并且把任务序列化到磁盘或数据库,系统不忙的时候在启动定时任务执行,尽管会有延迟,但是可以避免丢数据。

线程池的线程数如何设置

首先要看任务的类型是IO密集型,还是CPU密集型?线程数过多,会导致过多的上下文切换;线程数过少,会导致不能充分利用系统资源。

  • IO密集型:频繁读取磁盘上的数据,或者需要通过网络远程调用接口。IO密集型的任务,线程很多时候都会被阻塞,所以多创建一些线程,通常创建2N个线程,其中N代表CPU核数。
  • CPU密集型:非常复杂的调用,循环次数很多,或者递归调用层次很深等。CPU密集型的任务,线程通常不会被阻塞,并且过多的上下文切换会降低效率,所以少创建几个线程,通常创建N + 1个线程,其中N代表CPU核数。

创建线程池的快捷方法 Executors

Executors提供了多个静态方法去创建线程池,在使用上比ThreadPoolExecutor更简单,但是普遍不推荐使用。阿里提供的开发规范中更推荐使用ThreadPoolExecutor来直接创建线程池,这样可以增加对于线程池的控制力度。

Executors提供的创建线程池的方法:

  • 固定数目的线程池:newFixedThreadPool,创建一个线程池,核心线程数和最大线程数的值是相等的,使用无界阻塞队列
// 用户指定线程数即可,
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 可缓存线程池:newCachedThreadPool,将核心线程数设置为0,将最大线程数设置为int类型的最大值,来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程,阻塞队列中不存储元素,来了任务,就创建新线程执行或使用原有的空闲线程执行。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
  • 单个线程的线程池:newSingleThreadExecutor,线程池中只有一个线程,使用无界阻塞队列
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

定时任务线程池 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor,可以执行定时任务的线程池,它继承了ThreadPoolExecutor,添加了定时任务的功能。

通过几个实例案例来学习定时任务的api

案例1:只执行一次的定时任务

schedule方法:public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):通过schedule方法提交的定时任务,只会在指定时间后执行一次

public static void main(String[] args) {int corePoolSize = 3;ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize);// 执行定时任务,在指定时间后执行一次,这里是在任务提交后的3秒后执行一次打印任务System.out.println(System.currentTimeMillis() + " 开始计时");executor.schedule(() -> {System.out.println(System.currentTimeMillis() + " hello world");}, 3L, TimeUnit.SECONDS);executor.shutdown();
}
案例2:以固定频率重复执行的定时任务

scheduleAtFixedRate:public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):定时任务会以固定的频率执行

public static void main(String[] args) {int corePoolSize = 3;ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize);Utils.println("任务开始");// 任务提交后延迟1秒,并且每秒执行1次AtomicInteger i = new AtomicInteger(1);executor.scheduleAtFixedRate(() -> {int taskId = i.get();Utils.println("任务" + taskId + "开始"); // 工具方法,打印字符串的时候同时打印当前时间、线程try {Thread.sleep(new Random().nextInt(5) * 1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println("任务" + taskId + "结束");i.set(taskId + 1);}, 1L, 1L, TimeUnit.SECONDS);
}

注意:如果前一个定时任务在下一个任务应该开始执行的时候还没有执行完成,下一个定时任务的执行时间会被延迟,并且它会在前一个定时任务执行完成后立刻执行

案例3:以固定延迟重复执行的定时任务

scheduleWithFixedDelay:public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):和scheduleAtFixedRate方法的功能类似,但是任务的下次执行时间受上次任务的执行时间的影响,因为它是上次任务执行完成之后,间隔指定时长,再执行下次任务

public static void main(String[] args) {int corePoolSize = 3;ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize);AtomicInteger i = new AtomicInteger(1);executor.scheduleWithFixedDelay(() -> {int taskId = i.get();Utils.println("任务" + taskId + "开始");try {Thread.sleep(new Random().nextInt(5) * 1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println("任务" + taskId + "结束");i.set(taskId + 1);}, 1000L, 1000L, TimeUnit.MILLISECONDS);
}
实战案例:写一个每晚0点定时执行的定时任务

方法:先计算出当前时间到下一天0的之间的秒数,作为启动时的延迟,然后使用1天的秒数作为固定频率。

// 每天晚上0点固定执行的定时任务
public static void main(String[] args) {int corePoolSize = 3;ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(corePoolSize);poolExecutor.scheduleAtFixedRate(() -> {Utils.println("任务");}, getSecondsToNextDayMidnight(), 86400L, TimeUnit.SECONDS);
}// 获取当前时间到第二天0点的秒数
public static long getSecondsToNextDayMidnight() {// 获取当前时间LocalDateTime now = LocalDateTime.now();// 获取下一天0点的时间LocalDateTime nextDayMidnight = now.plusDays(1).with(LocalTime.MIDNIGHT);// 计算两个时间之间的秒数return ChronoUnit.SECONDS.between(now, nextDayMidnight);
}
案例5:有返回值的定时任务

定时任务使用ScheduledFuture来接收返回值,它继承了Future接口

public interface ScheduledFuture<V> extends Delayed, Future<V> { }

案例:

// 有返回值的定时任务
public static void main(String[] args) {int corePoolSize = 3;ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize);// 执行定时任务,在指定时间后执行一次System.out.println(System.currentTimeMillis() + " 开始计时");ScheduledFuture<String> scheduledFuture = executor.schedule(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println(System.currentTimeMillis() + " hello world");return "result";}}, 3L, TimeUnit.SECONDS);try {String s = scheduledFuture.get();System.out.println("定时任务的结果 = " + s);} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}executor.shutdown();
}

使用经验

1、如果异步任务中还需要再启动一个异步任务,记得使用一个新的线程池,否则如果新的异步任务阻塞,会导致原先的异步任务也阻塞,从而线程无法释放,任务堆积到阻塞队列中,造成内存溢出。参考 https://heapdump.cn/article/646639
2、如果线程池的拒绝策略设置成DiscardPolicy或者DiscardOldestPolicy,一旦任务被丢弃,这些拒绝策略不会改变任务的状态,这个时候获取结果,get方法会一直阻塞。参考 https://juejin.cn/post/6864474891308482573

Q&A

1、核心线程数以外的线程,是怎么实现超过存活时间就移除的?

线程不执行任务时会一直阻塞地获取异步任务,如果当前线程数大于核心线程池数,线程在获取异步任务时会加上阻塞时长,超过阻塞时长,线程就会唤醒,此时会判断,如果线程没有获取到异步任务,就会从线程池中移出线程,这个阻塞时长就是线程的存活时长。


http://www.mrgr.cn/news/93263.html

相关文章:

  • AI 实战5 - pytorch框架实现face检测
  • 在S32K3上实现SOC的神经网络算法的可行性
  • io函数 day3 文件io与系统函数
  • 一篇文章讲解清楚ARM9芯片启动流程
  • ​Unity插件-Mirror使用方法(八)组件介绍(​Network Behaviour)
  • K8s 1.27.1 实战系列(一)准备工作
  • FastExcel/EasyExcel简介以及源码解析
  • 尚庭公寓项目记录
  • AD学习-最小系统板,双层
  • Ubuntu 22.04安装NVIDIA A30显卡驱动
  • Dify+DeepSeek | Excel数据一键可视化(创建步骤案例)(echart助手.yml)(文档表格转图表、根据表格绘制图表、Excel绘制图表)
  • VIA的寄生电感和Stub对高速信号的影响
  • 单细胞分析(21)——SCENIC 分析流程(singularity容器版)
  • RT-thread的MultiButton按键库的使用
  • 记录一次Spring事务失效导致的生产问题
  • 【DeepSeek 】学习编程的利器:DeepSeek 使用指南
  • 由麻省理工学院计算机科学与人工智能实验室等机构创建低成本、高效率的物理驱动数据生成框架,助力接触丰富的机器人操作任务
  • 给没有登录认证的web应用添加登录认证(openresty lua实现)
  • VsCode 快捷键备忘
  • DeepSeek、Grok 和 ChatGPT 对比分析:从技术与应用场景的角度深入探讨