Java线程池解密
从ThreadPoolExecutor类下手,深入探究Java线程池原理。
继承类图:
经典八股:线程池执行任务大概是这样的流程,来一个任务首先创建核心线程去执行,核心线程都在工作,那么会放到任务队列里,如果任务队列是个有界队列,且队列满了,则会让救急线程去执行,救急线程等到一定时间没有执行任务,则会自动结束掉。如果救急线程也都在工作,那么会执行拒绝策略。这个ThreadPoolExecutor类中的拒绝策略内部类有四个:直接抛弃、抛异常、让调用者执行、抛弃掉队列最早的任务。
为什么是这样?来探究一下。
核心属性分析
ctl
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。并且这个int用的是原子类。
(本图源来自小傅哥)
所以状态就能比较大小了,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
为什么要用一个字段来表示两个值,状态和数量分别用两个字段表示不行吗?
目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值
计算线程数量:
计算线程池状态:
在ThreadPoolExecutor类的所有方法中,变量名都用rs表示线程池状态,wc表示线程数量。后面看到这两个变量名就知道代表的是什么意思。
内部类Worker
在该线程池中,池中的每一个工作线程都是一个Worker对象,Worker是ThreadPoolExecutor的一个内部类:
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
所以Worker可以认为就是一个Runnable任务,并且具有AQS锁的性质。Runnable可以干嘛?可以丢到Thread类里调用start()方法去运行Runnable任务。ThreadPoolExecutor线程池也是这么干的。
workers
线程池子本质上就是一个set集合:
(任何高大上的东西,只要看透了它的本质,其实就那么回事。只是你自己心里给它加上了一个高大上的滤镜而已)
拒绝策略
当构造方法中没有指定拒绝策略使,默认拒绝策略是丢弃该任务
核心方法分析
用线程池执行任务 execute
第一个if块:workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果
线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务。
第二个if块:断线程池是否在运行,如果在,任务队列是否允许插入,插入成功再次验证线程池是否运
行,如果不在运行,移除插入的任务,然后抛出拒绝策略。如果在运行,没有线程了,就启用一个线程。
第三个else if块:如果添加非核心线程失败,就直接拒绝了。
其实核心线程和非核心线程本质上没有任务区别,都是一个worker对象,worker里也没有任何一个字段来标记是核心线程还是非核心线程,addWorker方法的core参数为false的意思是以maximumPoolSize线程数为上限创建worker。那非核心线程到达一定时间就会自动释放,而核心线程不会释放,这个是怎么实现的?后面说
添加worker线程 addWorker
addWorker主要负责创建新的worker任务,在Worker的构造方法里会使用线程工厂创建一个线程并将该worker任务丢到新线程里,然后在addWorker方法中调用Thread的start()方法执行任务,start()会调用Runnable的run(),Worker类重写了Runnable的run方法。
Worker构造方法及重写的run方法:
addWorker过程:
(goto是java中的强制跳转的语法,c或c++等其它语言也有类似语法)
总共可以分为两个黄框,两大块。第一块:cas自旋使线程数量+1;第二块:创建新的worker,将worker添加到set集合中,然后启动新线程。
执行worker任务 runWorker
上个方法中的t.start()调用后,就会执行Worker类实现的Runnable接口的run方法,run方法又会调用ThreadPoolExecutor类的runWorker方法
runWorker负责具体怎么执行任务。
几点说明:
- 这两个钩子允许我们自己继承线程池,做任务执行前后的处理
- 先看个代码:调用的是run方法而非start方法
如果我们自己调用Runnable的run方法,那么会在调用run方法的当前方法也就是main方法里执行Runnable任务。
runWorker()里也是自己调用的run(),那这里也不是用新线程执行的了?
不是的,这里我们就是在新线程里调用的run(),会在当前这个新线程里执行。上面图片中的当前线程是main线程,这里的当前线程是新线程。别忘了,这个方法是从Worker的构造方法
this.thread = getThreadFactory().newThread(this);------->addWorker()里的t.start();------>Worker里的run()一步步执行来的。
- 前面说的非核心线程自动释放,而核心线程不会,原理就在while循环里调用的getTask()方法
-
- finally块中的processWorkerExit方法中有线程数量-1和从workers.remove(w)。能走到finally块了,说明while循环退出,该线程是救急线程,要被释放了。
获取任务 getTask
获取任务 getTask
非核心线程自动释放原理:wc>corePoolSize当前线程数已经超出了核心线程数,timed为true,
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
走超时获取任务poll方法,指定时间获取不到,poll返回null,if(r!=null)不成立,timeOut赋值为true,进入下轮循环,下轮循环中
if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}
if块成立,方法退出,getTask返回null,调用getTask的runWorker方法的while循环不成立,线程的代码执行完了,线程也就释放了!!!
从这也能看出,核心线程和非核心线程本身没有任何区别,某一个线程getTask获取任务时,获取不到任务也就释放了,线程池也不知道释放的是核心线程还是非核心线程,谁被释放掉了,谁就是非核心线程。
其余线程(再说非核心线程都不太合适了,应该说其余线程)没有任务也不会释放原因:当前线程数wc已经不大于corePoolSize了执行的是BlockingQueue的take阻塞获取方法,该方法获取不到会一直卡着,直到获取到任务。
所以一旦向线程池中添加的线程数大于等于了corePoolSize了,那么线程池中就会始终存在至少corePoolSize个线程,直到停掉线程池。
以经典的LinkedBlockingQueue为例看看这两个获取方法:
take方法中,当count为0时会将该线程在notEmpty这个Condition上等待,直到其它线程往队列放东西时将该线程唤醒
基于以上,自己尝试手写了一个简易线程池:github地址