剖析源码,带你看懂JUC线程池运行机制
文章目录
- 1.线程池基础
- 1.1 概念
- 1.2 作用
- 1.3 创建方式
- 1.3.1 构造方法实现
- 1.3.2 Executors类静态方法实现
- 1 .4 类型
- 1.4.1 按线程池数目
- 1.4.2 按线程池功能
- 2.线程池工作原理
- 2.1 任务提交 execute
- 2.2 增加工作线程addWorker方法
- 2.3 工作线程运行原理runWorker方法
- 3.扩展补充
- 3.1 拒绝策略
- 3.3shutdown 和shutdownNow 的区别
- 3.4 与ForkJoinPool区别
- 4.小结
1.线程池基础
1.1 概念
借助池化思想,预先构建指定数目的线程用以处理系统任务,其中若干个线程形成一个线程池。池中通常会维护一定数目的线程来处理系统任务,也会根据执行任务的繁忙程度,动态扩增/缩减线程数目,具备动态调整处理系统任务速度的能力。
1.2 作用
1.避免线程频繁创建和销毁,提前创建指定数目的线程用以线程复用
,降低资源开销
以及提升系统响应速度
。
2.便于对线程资源限制和统一管理
,还可以根据任务情况动态实现增减线程
,提升资源利用率
。【这个作用面试经常问】
1.3 创建方式
JDK中创建线程池使用下面这个构造方法来实现的,除了工作窃取线程ForkJoinPool
。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
corePoolSize:核心线程数。 //线程池中一直存在的线程》
maximumPoolSize: 最大线程数。//线程池中最多允许线程
keepAliveTime: 线程空闲存活时间。 //多余空闲线程回收最长等待时间
unit: 时间单位。
workQueue: 工作队列【实现BlokingQueue接口】。 // 存待处理任务
threadFactory: 线程工厂实例
handler:任务队列满了的拒绝策略
1.3.1 构造方法实现
比如new ThreadPoolExecutor() 实现,自定义设置参数,功能灵活,适用于复杂业务场景自定义实现。对使用者要求较高。
1.3.2 Executors类静态方法实现
如下图所示,使用简单,可调整参数少,提供了很多默认实现,适用于场景业务简单。
// 固定线程池,最大和最小线程数相等都是 nThreads
Executors.newFixedThreadPool(int nThreads)
// parallelism 并行处理数,可以不传,默认就是
// 这个Runtime.getRuntime().availableProcessors() CPU处理核数。
Executors.newWorkStealingPool(int parallelism)
// 最大线程数为Integer.MAX_VALUE,最小线程为0,使用很少。
// 线程太多CPU直接跑慢。
Executors.newCachedThreadPool()
// 定时任务线程池,最大线程数为Integer.MAX_VALUE。
Executors.newScheduledThreadPool(int corePoolSize)
// 单个线程构成的线程池
Executors.newSingleThreadExecutor()
1 .4 类型
1.4.1 按线程池数目
1.单线程线程池 newSingleThreadExecutor
2.固定线程线程池 newFixedThreadPool
3.无限线程线程池 newCachedThreadPool
1.4.2 按线程池功能
- 普通线程池ThreadPool
// 上面三种都是普通线程池,通常用于处理IO密集任务,比如大量http请求等- 工作窃取ForkJoinPool
// 主要用来处理CPU密集型任务,使用少量的线程并行处理大量的计算型任务。- 定时任务线程池scheduledThreadPool
// 处理延时任务或者是定期执行的任务。比如日志定期删除,缓存定期删除,消息定时发送等
2.线程池工作原理
多线程处理任务大致流程是,首先要构建线程池,其次提交任务给线程池,线程池接收任务,则自动会创建工作线程并运行,来处理任务,如果任务太多,处理不过来则会通过新建工作线程或者是将任务放如工作队列中。其中最为重要的几个方法分别是,提交任务
execute
,新建工作线程addWorker
,工作线程运行runWorker
这三个方法。
2.1 任务提交 execute
通过JDK21源码来剖析执行机制
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// ctl使用AtomicInteger 存储的对象,高3为存储线程状态,低29为存储状态下响应线程数 int c = ctl.get();// 按位与操作计算c低29位的值 得到当前线程数// 如果当前线程池小于核心线程池则新建线程成功则结束,未成功继续执行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 线程正在运行 让工作队列加任务,且添加成功if (isRunning(c) && workQueue.offer(command)) {// 二次检查int recheck = ctl.get();// 线程池不是运行状态 则删除任务。有可能存在使用者直接shutdownNow/shutdown 方法直接停止线程池。if (! isRunning(recheck) && remove(command))// 拒绝策略。reject(command);// 如果当前线程为0,可能存在重新启动情况线程刚好为空。else if (workerCountOf(recheck) == 0)// 新增工作线程addWorker(null, false);}// 工作队列满了,新增工作线程,新增失败则执行拒绝策略。else if (!addWorker(command, false))reject(command);}
2.2 增加工作线程addWorker方法
JDK21源码分析,
/*** RUNNING -> SHUTDOWN* On invocation of shutdown()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed*private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
**/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.// 当线程池处为空并且工作队列为空,此时不增加线程池。// 可结合上面注释代码看懂。if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// core变量为false 情况下,超过最大线程数直接失败。if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// cas 机制,只有一个线程成功 if (compareAndIncrementWorkerCount(c))break retry;// c = ctl.get(); // Re-read ctl// cas 失败 重试。if (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个工作线程Worker对象w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 获取锁 可能存在多个线程调用了execute方法。mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();// 判断是否是运行状态或者是重启状态if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {// 如果不是新建线程状态抛异常if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();// 增加至workers工作线程hashSet实现。workers.add(w);workerAdded = true;int s = workers.size();// 这工作线程数目则重新赋值。这个是计算当前线程池实际运行的最大线程池数if (s > largestPoolSize)largestPoolSize = s;}} finally {//释放锁mainLock.unlock();}if (workerAdded) {// 底层还是调用Thread.start方法来开启线程container.start(t);workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
2.3 工作线程运行原理runWorker方法
通过源码来分析:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 当前任务或者工作队列任务不为空。循环处理while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 任务执行前逻辑,这个默认实现为空。// JUC大佬采用模板模式,先给开发人员占个位置,让开发者可自定义实现。beforeExecute(wt, task);try {// 执行任务task.run();// 执行后逻辑,方法体为空。交由开发人员自定制实现afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {// 任务清空 GC回收task = null;// 线程处理任务数加一w.completedTasks++;// 释放锁w.unlock();}}completedAbruptly = false;} finally {// 工作线程回收,如果回收后,发现当前线程小于核心线程,会继续创建一个线程加入线程池中。processWorkerExit(w, completedAbruptly);}}
3.扩展补充
3.1 拒绝策略
JUC默认四种策略:
AbortPolicy: 直接抛异常。默认实现策略。
DiscardPolicy: 默默丢弃,不抛异常。
DiscardOldestPolicy: 使用poll丢弃一个任务。
CallerRunsPolicy: 交由调用者所在线程执行。
自定义拒绝策略:只需要实现RejectedExecutionHandler 这个接口重写rejectedExecution 方法接口即可。一个简单示例如下:
比如com.zaxxer.hikari.pool.HikariPool 实现了自定义注册策略
3.3shutdown 和shutdownNow 的区别
shutdown:
执行后,线程池不会再接收新任务,但是处于工作队列的任务还是要执行完,方法立刻返回,不会等待队列任务完成返回。【异步】
shutdownNow:
线程池不接受新任务,工作队列任务直接丢弃,正在执行任务会直接中断,立刻返回,返回值为队列里面被丢弃的任务列表。
3.4 与ForkJoinPool区别
1.ForkJoinPool 每个线程单独有个队列,ThreadPoolExecutor中只有一个队列。
2.ForkJoinPool 采用双端队列实现,执行任务采用LIFO顺序,窃取任务从尾部获取任务,有效减少任务竞争,当任务只剩一个时候,允许线程之间CAS来竞争获取锁。ThreadPoolExecutor采用FIFO排队方式执行任务。
3.ThreadPoolExecutor适用于IO密集型,ForkJoinPool 适用于CPU密集型,最好是非阻塞任务。
4.小结
本文通过源码分析深度剖析了JUC下线程池运行机制,并对线程池类型和使用做了简单介绍,希望能给大家带来些许帮助,如文中存在错误,可以在评论区交流指正。