Java:多线程(线程池,执行原理,优雅停止,延迟周期)
1,线程池
1.1,线程池介绍
线程池:是一种多线程处理形式,每个请求使用一个线程处理,所有线程预先创建,并发请求数大于线程数时排队等待。一方面系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互,另外一方面当用户数量很多时,会导致服务器崩溃,线程池可以限制并发用户最大数。在这种情形下,使用线程池可以 很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时(JDK21考虑虚拟线程),更应该考虑使用线程池。
线程池优点:
避免重复创建线程,减少在创建和销毁线程时所花时间,及系统的整体开销。
避免系统创建大量线程而消耗系统资源。
用户提交的数据能够及时得到处理,响应速度快。
能够更好的监控和管理线程。
线程池缺点:
线程池不支持线程的取消、完成、失败通知等交互性操作。
线程池不支持线程执行的先后次序排序。
不能设置池化线程(线程池内的线程)的Name,会增加代码调试难度。
池化线程通常都是后台线程,优先级为ThreadPriority.Normal。
池化线程阻塞会影响性能(阻塞会使CLR错误地认为它占用了大量CPU。CLR能够检测或补偿(往池中注入更多线程),但是这可能使线程池受到后续超负荷的印象。Task解决了这个问题)。
线程池使用的是全局队列,全局队列中的线程依旧会存在竞争共享资源的情况,从而影响性能(Task解决了这个问题方案是使用本地队列)。
与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象或 Callable对象传给线程池,线程池就会启动一个空闲的线程来执行它们的run()或call()方法,当run()或 call()方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个 Runnable对象的run()或call()方法。
从Java 5开始,Java内建支持线程池。Java 5新增了一个Executors工厂类来产生线程池,该工厂类包含 如下几个静态工厂方法来创建线程池。创建出来的线程池,都是通过ThreadPoolExecutor类来实现的:
- newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
- newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。
- newSingleThreadExecutor():创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1。
- newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以周期型,指定延迟执行线程任务。
- newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
- newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持 给定的并行级别,该方法还会使用多个队列来减少竞争。
- newWorkStealingPool():该方法是前一个方法的简化版本。如果当前机器有4个 CPU,则目标并行级别被设置为4,也就是相当于为前一个方法传入4作为参数。
上面7个方法中前三个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象或Callable对象所代表的线程;而中间两个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务;最后两个方法则是Java 8新增的,这两个方法可以充分利用多CPU并行的能力。这两个方法生成的work stealing池,都相当于后台线程池,如果所有的前台线程都死亡了,work stealing池中的线程会自动死亡。
由于目前计算机硬件的发展日新月异,即使普通用户使用的电脑通常也都是多核CPU,因此Java 8在线程支持上也在增加了利用多CPU并行的能力,这样可以更好地发挥底层硬件的性能。
ExecutorService代表尽快执行线程的线程池(只要线程池中有空闲线程,就立即执行任务),程序只要将一个Runnable对象或Callable对象(代表线程任务)提交给该线程池,该线程池就会尽快执行该任务。ExecutorService里提供了如下三个方法:
- Future<?> submit(Runnable task):将一个Runnable对象提交给指定的线程池,线程池将在有空闲时执行Runnable对象所代表的任务。其中Future对象代表Runnable任务的返回值——但run()方法没有返回值,所以Future对象将在run()方法执行结束后返回null。但可以调用Future的isDone()、isCancelled()方法来获得Runable对象的状态。
- <T>Future<T> submit(Runnable task, T result):将一个Runnable对象 提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。其中result显式指定线程执行结束后的返回值,所以Futrue对象将在run()方法执行结束后返回result。
- <T>Future<T> submit(Callable<T> task):将一个Callable对象提交给指定的线程池,线程池将在有空闲线程时执行Callable对象代表的任务。其中Futrure代表Callable对象的call()方法的返回值。
ScheduledExecutorService代表可在指定延迟后或周期性地执行线程任务的线程池,它提供了如下四个方法:
- ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):指定callable任务将在delay延迟后执行。
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):指定command任务将在delay延迟后执行。
- ScheduledFuture<?> scheduleAtFixedRate(Runnable commond, long initialDelay, long period, TimeUnit unit):指定commond任务将在delay延迟后执行,而且设定频率重复执行。也就是说,在initialDelay后开始执行,依次在initialDelay+period、initialDelay+2*period、...处重复执行,依次类推。
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable commond, long initialDelay, long delay,TimeUnit unit):创建并执行一个在给定初始的延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务在任一次执行时遇到异常,就会取消后续执行;否则,只能通过程序来显式取消或终止该任务。
1.2,工作流程
- RUNNING:也是线程池的默认状态,当new一个ThreadPoolExecutor实例之后,这个ThreadPoolExecutor的状态就是运行态。
- SHUTDOWN:不清空任务队列,不接收新任务,但仍然会处理已经提交的任务。中断空闲线程。
- STOP:清空任务队列,中断正在运行的线程,以尝试停止它们(可能导致数据不一致)。此方法会返回一个列表,包含所有在调用时尚未开始执行的任务。如果正在运行的任务不响应中断(例如,没有检查中断标志),则它们可能仍会继续执行。
- TIDYING:当线程池中所有任务已被终止, 这个ThreadPoolExecutor实例就会进入停止态。
- TERMINATED:线程池已经关闭并且所有任务都已经完成。在终止状态下,线程池不再执行任何任务,且不能再被重新启动。进入TERMINATED的条件如下:
- 线程池不是RUNNING状态。
- 线程池状态不是TIDYING状态或TERMINATED状态。
- 如果线程池状态是SHUTDOWN并且workerQueue为空。
- workerCount为0。
- 设置TIDYING状态成功。
除terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
1.3,线程池的参数
- 判断核心线程池(corePoolSize)是否已满,没满则创建一个新的工作线程执行任务。
- 判断任务队列(workQueue)是否已满,没满则将新提交的任务添加在工作队列。
- 判断整个线程池是否已满(maximumPoolSize),没满则创建一个新的工作线程来执行任务,已满则执行拒绝策略。
- corePoolSize(核心工作线程数):当向线程池提交一个任务时,若线程池已创建的线程数小于 corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时。
- maximumPoolSize(最大线程数):线程池所允许的最大线程个数。当corePoolSize和队列已满,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
- keepAliveTime(多余线程存活时间):当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
- workQueue(队列):用于传输和保存等待执行任务的阻塞队列。
- threadFactory(线程创建工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池 的编号,n为线程池内的线程编号)。
- handler(拒绝策略):当corePoolSize和队列都满时,加入新任务会执行此策略。
1.4,拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- DiscardPolicy:也是丢弃任务,但是不抛出异常。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复该过程)。
- CallerRunsPolicy:那么调用线程(也就是提交任务的线程)直接在自己的线程里执行,线程池不处理。这意味着任务不会被丢弃,而是在提交任务的线程上直接执行,从而避免了任务丢失的风险。
1.5,队列大小的设置
- CPU密集型任务:量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开 过多的线程数,会造成CPU过度切换。
- IO密集型任务:可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让 CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
- 混合型任务:可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两 个任务的执行时间相差不大,那么就会比串行执行来的高效。因为如果划分之后两个任务执行时间 有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍 然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
2,ThreadPoolExecutor
2.1,数据结构
public class ThreadPoolExecutor extends AbstractExecutorService { //...private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 存放任务的阻塞队列private final BlockingQueue<Runnable> workQueue; // 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock = new ReentrantLock(); // 线程集合private final HashSet<Worker> workers = new HashSet<Worker>(); //... }
每一个线程是一个Worker对象。Worker是ThreadPoolExecutor的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// ...final Thread thread; // Worker封装的线程Runnable firstTask; // Worker接收到的第1个任务volatile long completedTasks; // Worker执行完毕的任务个数 // ... }
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁,用于线程池的关闭、线程执行任务的过程中。
2.2,优雅停止(shutdown & shutdownNow)
关闭线程池的过程为:在调用shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用awaitTermination()来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow(); executor.shutdown(); try {boolean flag = true; do {flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS); } while (flag); } catch (InterruptedException e) { // ... }
awaitTermination(...)方法不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。
shutdown():不会清空任务队列,会等待所有任务执行完成,shutdownNow()会清空任务队列。
//ThreadPoolExecutor public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查是否有关闭线程池权限checkShutdownAccess();// 将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲线程interruptIdleWorkers();// 具有空方法体的钩子方法onShutdown();} finally {mainLock.unlock();}tryTerminate(); }
shutdownNow():shutdown()只会中断空闲线程,shutdownNow()会中断所有线程。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池的状态设置为STOPadvanceRunState(STOP);// 中断所有线程interruptWorkers();// 任务队列清空tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }
interruptIdleWorkers():
private void interruptIdleWorkers() {interruptIdleWorkers(false); } /*** 此方法用于在特定情况下中断一个或多个空闲的工作线程* 它主要用于在工作线程空闲时,根据需要中断它们,以控制线程的执行** @param onlyOne 如果为true,则只中断一个空闲工作线程;如果为false,则中断所有空闲工作线程*/ private void interruptIdleWorkers(boolean onlyOne) {// 获取主线程锁,以确保线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历所有工作线程for (Worker w : workers) {Thread t = w.thread;// 检查线程是否已经被中断,并尝试获取工作线程的锁// 如果成功,表示线程处于空闲状态;如果不成功,表示线程持有锁,正在执行某个任务if (!t.isInterrupted() && w.tryLock()) {try {// 中断当前工作线程t.interrupt();} catch (SecurityException ignore) {// 捕获安全异常,此处忽略} finally {// 释放工作线程的锁w.unlock();}}// 如果只需要中断一个空闲工作线程,则跳出循环if (onlyOne)break;}} finally {// 释放主线程锁mainLock.unlock();} }
tryLock() & tryAcquire():
public boolean tryLock() { return tryAcquire(1); }/*** 尝试获取独占锁* 此方法用于在当前线程尝试获取锁时立即获取锁,而不会被挂起* * @param unused 未使用,保留参数,通常用于未来扩展* @return 如果成功获取锁,则返回true;否则返回false*/ protected boolean tryAcquire(int unused) {// 尝试通过CAS操作获取锁,如果当前状态为0,则设置状态为1,表示已获取锁if (compareAndSetState(0, 1)) {// 设置当前线程为锁的独占所有者setExclusiveOwnerThread(Thread.currentThread());return true;}// 如果获取锁失败,返回falsereturn false; }
interruptIfStarted():
/*** 中断所有线程,即使它们正在运行。忽略 SecurityException* (在这种情况下,某些线程可能不会被中断)。*/ private void interruptWorkers() {// 获取主锁以防止其他线程在中断期间访问共享资源final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历所有工作线程并尝试中断它们for (Worker w : workers)w.interruptIfStarted();} finally {// 确保在任何情况下都释放主锁mainLock.unlock();} }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}} }
shutdown()和shutdownNow()都调用了tryTerminate()方法:tryTerminate()不会强行终止线程池,只是做了一下检测;当workQueue为空,workCount为0时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING改为TERMINATED,接着调用termination.signalAll(),通知前面阻塞在awaitTermination的所有调用者线程。所以,TIDYING和TERMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 当workQueue为空,workCount为0时,执行下述代码。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将状态切换到TIDYING状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); // 调用钩子函数} finally {ctl.set(ctlOf(TERMINATED, 0)); // 将状态有TIDYING改为TERMINATEDtermination.signalAll(); // 通知awaitTermination(...)}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }
2.3,任务提交过程
提交任务的方法如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果当前线程数小于corePoolSize,则启动新线程if (workerCountOf(c) < corePoolSize) {// 添加worker,并将command设置为worker线程的第一个任务开始执行if (addWorker(command, true))return;c = ctl.get();}// 如果当先的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求if (! isRunning(recheck) && remove(command))reject(command);// 放入队列中后发现没有线程执行任务,开启新线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 线程数大于maxPoolSize,并且队列已满,则调用拒绝策略else if (!addWorker(command, false))reject(command); }
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上线,否则使用maxPoolSize作为上线 private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// 如果线程池状态值 起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空// 则添加worker失败,返回falseif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// 工作线程数达到上线,要么是corePoolSize要么是maximumPoolSize,启动线程失败if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 增加worker数量成功,返回到retry语句if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CASif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}// worker数量加1成功后,接着运行boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 新建worker对象w = new Worker(firstTask);// 获取线程对象final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {// 由于线程已经在运行中,无法启动,抛异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将线程对应的worker加入worker集合workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {// 释放锁mainLock.unlock();}// 如果添加worker成功,则启动该worker对应的线程if (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果启动新线程失败if (! workerStarted)// workCount - 1addWorkerFailed(w);}return workerStarted; }
2.4,任务执行过程
在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取出任务执行,这是一个不断循环的过程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// 当前worker对象封装的线程final Thread thread;// 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务 Runnable firstTask;// 记录线程执行完成的任务数量,每个线程一个计数器 volatile long completedTasks;/*** 适应给定的第一个任务并利用线程工厂创建Worker实例* @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列获取任务*/Worker(Runnable firstTask) {setState(-1); // 线程处于阻塞状态,调用runWorker的时候中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 调用ThreadPoolExecutor的runWorker方法执行线程的运行 public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 中断Worker封装的线程w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。while (task != null || (task = getTask()) != null) {// 获取线程锁w.lock();// 如果线程池停止了,去报线程被中断// 如果线程池正在运行,确保线程不被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,给自己发中断信号wt.interrupt();try {// 任务执行之前的钩子方法,实现为空beforeExecute(wt, task);try {task.run();// 任务执行结束后的钩子方法,实现为空afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {// 任务执行完成,将task设置为nulltask = null;// 线程已完成的任务数加1w.completedTasks++;// 释放线程锁w.unlock();}}// 判断线程是否正常退出completedAbruptly = false;} finally {// Worker退出processWorkerExit(w, completedAbruptly);}} }
2.5,线程池关闭过程
【shutdown()与任务执行过程综合分析】当调用shutdown()的时候,可能出现以下几种场景:
当调用shutdown()的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。
当调用shutdown的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样,退出while循环。
当调用shutdown()的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在getTask()方法的地方。空闲的这些线程和场景1一样处理,不空闲的线程会和场景2一样处理。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 如果线程池调用了shutdownNow(),返回null// 如果线程池调用了shutdown(),并且任务队列为空,也返回nullif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {// 工作线程数减1decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间// 一旦中断。此处抛异常,对应上文场景1Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }
【shutdownNow()与任务执行过程综合分析】和上面的shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程的中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。当一个Worker最终退出的时候,会执行清理工作:
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果线程正常退出,不会执行if语句,这里一般是非正常退出,需要将worker数量减1if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 将自己的worker从集合移除workers.remove(w);} finally {mainLock.unlock();}// 每个线程在结束的时候都会用该方法,看是否可以停止线程池tryTerminate();int c = ctl.get();// 如果在线程池退出前,发现线程池还没有关闭if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果线程池中没有其他线程来,并且任务队列非空if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果工作线程数大于min,表示队列中的任务可以由其他线程执行,退出当前线程if (workerCountOf(c) >= min)return; // replacement not needed}// 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执行// 就再启动一个线程来处理addWorker(null, false);} }
3,ScheduledThreadPoolExecutor
3.1,延迟执行和周期性执行原理
ScheduledThreadPoolExecutor实现了按时间调度来执行任务:
- AtFixdRate:按固定频率执行,与任务本身运行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。
- WithFixedDelay:按固定间隔执行,与任务本身执行时间有关,例如本身执行时间是10s,间隔2s,则下一次开始执行的时间是12s。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 延迟执行任务依靠的是DelayQueue。DelayQueue是BlockingQueue的一种,其实现原理是二叉堆。不过这里并没有没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue。
- 而周期性执行任务时执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。
3.2,延迟执行
schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelaydWorkerQueue中。任务的执行过程还是复用ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。
/*** 此方法用于安排一个任务,在指定延迟后执行一次。它提供了一种非阻塞方式安排任务的方法,支持带有延迟的任务调度,* 对于实现定时器或延迟初始化等功能非常有用。* @param command 要执行的任务,表示为Runnable。如果任务为null,将抛出NullPointerException。* @param delay 任务开始执行前的延迟时间。如果延迟小于或等于零,任务将被立即安排执行。* @param unit 延迟参数的时间单位,例如SECONDS、MINUTES等。如果unit为null,将抛出NullPointerException。* @return 返回一个ScheduledFuture对象,可以用来取消任务或查询任务状态。*/ public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();// 创建一个装饰后的任务,使用ScheduledFutureTask包装原始任务RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 执行延迟任务delayedExecute(t);return t; }
传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(...)方法把Runnable包装成一个ScheduledFutureTask对象,而DelaydWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。
/*** 延迟或周期任务的主要执行方法。如果线程池已关闭,则拒绝任务。否则将任务添加到队列中,* 并在必要时启动一个线程来运行任务。(我们不能预先启动线程来运行任务,因为任务可能还没有准备好运行。)* 如果在线程池关闭期间添加任务,根据状态和关闭后运行参数,取消并移除任务。** @param task 要执行的任务*/ private void delayedExecute(RunnableScheduledFuture<?> task) {// 检查线程池是否处于关闭状态,如果是则拒绝任务if (isShutdown())reject(task);else {// 将任务添加到队列中super.getQueue().add(task);// 如果在线程池关闭期间添加任务,并且任务的执行状态不允许,// 根据状态和关闭后运行参数取消并移除任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) && remove(task))task.cancel(false);else// 确保启动一个线程来运行任务ensurePrestart();} }
/*** 启动所有核心线程,使它们空闲等待任务。这将覆盖默认策略,即只有在执行新任务时才启动核心线程。** @return 返回启动的线程数量*/ public int prestartAllCoreThreads() {int n = 0;// 循环添加工作线程,直到无法再添加为止while (addWorker(null, true))++n;return n; }
3.3,周期性执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// 创建一个新的 ScheduledFutureTask 实例ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 装饰任务,可能添加额外的行为RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// 延迟执行任务delayedExecute(t);return t; }
和schedule(...)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkedQueue就结束了。两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个整数。
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber;private volatile long time;private final long period;ScheduledFutureTask(Runnable r, V result, long triggerTime,long period, long sequenceNumber) {super(r, result);this.time = triggerTime; // 延迟时间this.period = period; // 周期this.sequenceNumber = sequenceNumber;}// 实现Delayed接口public long getDelay(TimeUnit unit) {return unit.convert(time - System.nanoTime(), NANOSECONDS);}// 实现Comparable接口public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;// 延迟时间相等,进一步比较序列号else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}// 实现Runnable接口public void run() {if (!canRunInCurrentRunState(this))cancel(false);// 如果不是周期执行,则执行一次else if (!isPeriodic())super.run();// 如果是周期执行,则重新设置下一次运行的时间,重新入队列else if (super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}// 下一次执行时间private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);} }// 下一次触发事件 long triggerTime(long delay) {return System.nanoTime() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }// 放到队列中,等待下一次执行 void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(task)) {super.getQueue().add(task);if (canRunInCurrentRunState(task) || !remove(task)) {ensurePrestart();return;}}task.cancel(false); }
withFixedDelay和atFixedRate的区别就体现在setNextRunTIme里面:
- 如果是atFixedRate,period > 0,下一次开始执行时间等于上一次开始开始执行时间+period;
- 如果是withFixedDelay,period < 0,下一次开始执行时间等于tiggerTime(-p),为now+(-period),now即上一次执行的结束时间。