当前位置: 首页 > news >正文

Java:多线程(线程池,执行原理,优雅停止,延迟周期)

1,线程池

1.1,线程池介绍

线程池:是一种多线程处理形式,每个请求使用一个线程处理,所有线程预先创建,并发请求数大于线程数时排队等待。一方面系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互,另外一方面当用户数量很多时,会导致服务器崩溃,线程池可以限制并发用户最大数。在这种情形下,使用线程池可以 很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时(JDK21考虑虚拟线程),更应该考虑使用线程池。

线程池优点:

  • 避免重复创建线程,减少在创建和销毁线程时所花时间,及系统的整体开销。

  • 避免系统创建大量线程而消耗系统资源。

  • 用户提交的数据能够及时得到处理,响应速度快。

  • 能够更好的监控和管理线程。

线程池缺点:

  • 线程池不支持线程的取消、完成、失败通知等交互性操作。

  • 线程池不支持线程执行的先后次序排序。

  • 不能设置池化线程(线程池内的线程)的Name,会增加代码调试难度。

  • 池化线程通常都是后台线程,优先级为ThreadPriority.Normal。

  • 池化线程阻塞会影响性能(阻塞会使CLR错误地认为它占用了大量CPU。CLR能够检测或补偿(往池中注入更多线程),但是这可能使线程池受到后续超负荷的印象。Task解决了这个问题)。

  • 线程池使用的是全局队列,全局队列中的线程依旧会存在竞争共享资源的情况,从而影响性能(Task解决了这个问题方案是使用本地队列)。

与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象或 Callable对象传给线程池,线程池就会启动一个空闲的线程来执行它们的run()或call()方法,当run()或 call()方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个 Runnable对象的run()或call()方法。

从Java 5开始,Java内建支持线程池。Java 5新增了一个Executors工厂类来产生线程池,该工厂类包含 如下几个静态工厂方法来创建线程池。创建出来的线程池,都是通过ThreadPoolExecutor类来实现的:

  • newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
  • newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。
  • newSingleThreadExecutor():创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为1。
  • newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以周期型,指定延迟执行线程任务。
  • newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
  • newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持 给定的并行级别,该方法还会使用多个队列来减少竞争。
  • newWorkStealingPool():该方法是前一个方法的简化版本。如果当前机器有4个 CPU,则目标并行级别被设置为4,也就是相当于为前一个方法传入4作为参数。

上面7个方法中前三个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象或Callable对象所代表的线程;而中间两个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务;最后两个方法则是Java 8新增的,这两个方法可以充分利用多CPU并行的能力。这两个方法生成的work stealing池,都相当于后台线程池,如果所有的前台线程都死亡了,work stealing池中的线程会自动死亡。

由于目前计算机硬件的发展日新月异,即使普通用户使用的电脑通常也都是多核CPU,因此Java 8在线程支持上也在增加了利用多CPU并行的能力,这样可以更好地发挥底层硬件的性能。

ExecutorService代表尽快执行线程的线程池(只要线程池中有空闲线程,就立即执行任务),程序只要将一个Runnable对象或Callable对象(代表线程任务)提交给该线程池,该线程池就会尽快执行该任务。ExecutorService里提供了如下三个方法:

  • Future<?> submit(Runnable task):将一个Runnable对象提交给指定的线程池,线程池将在有空闲时执行Runnable对象所代表的任务。其中Future对象代表Runnable任务的返回值——但run()方法没有返回值,所以Future对象将在run()方法执行结束后返回null。但可以调用Future的isDone()、isCancelled()方法来获得Runable对象的状态。
  • <T>Future<T> submit(Runnable task, T result):将一个Runnable对象 提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。其中result显式指定线程执行结束后的返回值,所以Futrue对象将在run()方法执行结束后返回result。
  • <T>Future<T> submit(Callable<T> task):将一个Callable对象提交给指定的线程池,线程池将在有空闲线程时执行Callable对象代表的任务。其中Futrure代表Callable对象的call()方法的返回值。

ScheduledExecutorService代表可在指定延迟后或周期性地执行线程任务的线程池,它提供了如下四个方法:

  • ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):指定callable任务将在delay延迟后执行。
  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):指定command任务将在delay延迟后执行。
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable commond, long initialDelay, long period, TimeUnit unit):指定commond任务将在delay延迟后执行,而且设定频率重复执行。也就是说,在initialDelay后开始执行,依次在initialDelay+period、initialDelay+2*period、...处重复执行,依次类推。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable commond, long initialDelay, long delay,TimeUnit unit):创建并执行一个在给定初始的延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务在任一次执行时遇到异常,就会取消后续执行;否则,只能通过程序来显式取消或终止该任务。

1.2,工作流程

  • RUNNING:也是线程池的默认状态,当new一个ThreadPoolExecutor实例之后,这个ThreadPoolExecutor的状态就是运行态。
  • SHUTDOWN:不清空任务队列,不接收新任务,但仍然会处理已经提交的任务。中断空闲线程。
  • STOP:清空任务队列,中断正在运行的线程,以尝试停止它们(可能导致数据不一致)。此方法会返回一个列表,包含所有在调用时尚未开始执行的任务。如果正在运行的任务不响应中断(例如,没有检查中断标志),则它们可能仍会继续执行。
  • TIDYING:当线程池中所有任务已被终止, 这个ThreadPoolExecutor实例就会进入停止态。
  • TERMINATED:线程池已经关闭并且所有任务都已经完成。在终止状态下,线程池不再执行任何任务,且不能再被重新启动。进入TERMINATED的条件如下:
    • 线程池不是RUNNING状态。
    • 线程池状态不是TIDYING状态或TERMINATED状态。
    • 如果线程池状态是SHUTDOWN并且workerQueue为空。
    • workerCount为0。
    • 设置TIDYING状态成功。

除terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法: 

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

1.3,线程池的参数

  • 判断核心线程池(corePoolSize)是否已满,没满则创建一个新的工作线程执行任务。
  • 判断任务队列(workQueue)是否已满,没满则将新提交的任务添加在工作队列。
  • 判断整个线程池是否已满(maximumPoolSize),没满则创建一个新的工作线程来执行任务,已满则执行拒绝策略。
  • corePoolSize(核心工作线程数):当向线程池提交一个任务时,若线程池已创建的线程数小于 corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时。
  • maximumPoolSize(最大线程数):线程池所允许的最大线程个数。当corePoolSize和队列已满,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
  • keepAliveTime(多余线程存活时间):当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
  • workQueue(队列):用于传输和保存等待执行任务的阻塞队列。
  • threadFactory(线程创建工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池 的编号,n为线程池内的线程编号)。
  • handler(拒绝策略):corePoolSize和队列都满时,加入新任务会执行此策略。

1.4,拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复该过程)。
  • CallerRunsPolicy:那么调用线程(也就是提交任务的线程)直接在自己的线程里执行,线程池不处理。这意味着任务不会被丢弃,而是在提交任务的线程上直接执行,从而避免了任务丢失的风险。

1.5,队列大小的设置

  • CPU密集型任务:量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开 过多的线程数,会造成CPU过度切换。
  • IO密集型任务:可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让 CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。 
  • 混合型任务:可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两 个任务的执行时间相差不大,那么就会比串行执行来的高效。因为如果划分之后两个任务执行时间 有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍 然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

2,ThreadPoolExecutor

2.1,数据结构

public class ThreadPoolExecutor extends AbstractExecutorService {    //...private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    // 存放任务的阻塞队列private final BlockingQueue<Runnable> workQueue;    // 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock = new ReentrantLock();    // 线程集合private final HashSet<Worker> workers = new HashSet<Worker>();    //...
}

每一个线程是一个Worker对象。Worker是ThreadPoolExecutor的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// ...final Thread thread; // Worker封装的线程Runnable firstTask; // Worker接收到的第1个任务volatile long completedTasks; // Worker执行完毕的任务个数    // ...
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁,用于线程池的关闭、线程执行任务的过程中。

2.2,优雅停止(shutdown & shutdownNow)

关闭线程池的过程为:在调用shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用awaitTermination()来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow(); 
executor.shutdown();
try {boolean flag = true;    do {flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);    } while (flag);
} catch (InterruptedException e) {    // ...
}

awaitTermination(...)方法不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。

shutdown():不会清空任务队列,会等待所有任务执行完成,shutdownNow()会清空任务队列。

//ThreadPoolExecutor
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查是否有关闭线程池权限checkShutdownAccess();// 将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲线程interruptIdleWorkers();// 具有空方法体的钩子方法onShutdown();} finally {mainLock.unlock();}tryTerminate();
}

shutdownNow():shutdown()只会中断空闲线程,shutdownNow()会中断所有线程。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 将线程池的状态设置为STOPadvanceRunState(STOP);// 中断所有线程interruptWorkers();// 任务队列清空tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}

interruptIdleWorkers():

private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
/*** 此方法用于在特定情况下中断一个或多个空闲的工作线程* 它主要用于在工作线程空闲时,根据需要中断它们,以控制线程的执行** @param onlyOne 如果为true,则只中断一个空闲工作线程;如果为false,则中断所有空闲工作线程*/
private void interruptIdleWorkers(boolean onlyOne) {// 获取主线程锁,以确保线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历所有工作线程for (Worker w : workers) {Thread t = w.thread;// 检查线程是否已经被中断,并尝试获取工作线程的锁// 如果成功,表示线程处于空闲状态;如果不成功,表示线程持有锁,正在执行某个任务if (!t.isInterrupted() && w.tryLock()) {try {// 中断当前工作线程t.interrupt();} catch (SecurityException ignore) {// 捕获安全异常,此处忽略} finally {// 释放工作线程的锁w.unlock();}}// 如果只需要中断一个空闲工作线程,则跳出循环if (onlyOne)break;}} finally {// 释放主线程锁mainLock.unlock();}
}

tryLock() & tryAcquire():

public boolean tryLock()  { return tryAcquire(1); }/*** 尝试获取独占锁* 此方法用于在当前线程尝试获取锁时立即获取锁,而不会被挂起* * @param unused 未使用,保留参数,通常用于未来扩展* @return 如果成功获取锁,则返回true;否则返回false*/
protected boolean tryAcquire(int unused) {// 尝试通过CAS操作获取锁,如果当前状态为0,则设置状态为1,表示已获取锁if (compareAndSetState(0, 1)) {// 设置当前线程为锁的独占所有者setExclusiveOwnerThread(Thread.currentThread());return true;}// 如果获取锁失败,返回falsereturn false;
}

interruptIfStarted():

/*** 中断所有线程,即使它们正在运行。忽略 SecurityException* (在这种情况下,某些线程可能不会被中断)。*/
private void interruptWorkers() {// 获取主锁以防止其他线程在中断期间访问共享资源final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍历所有工作线程并尝试中断它们for (Worker w : workers)w.interruptIfStarted();} finally {// 确保在任何情况下都释放主锁mainLock.unlock();}
}void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}
}

shutdown()和shutdownNow()都调用了tryTerminate()方法:tryTerminate()不会强行终止线程池,只是做了一下检测;当workQueue为空,workCount为0时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING改为TERMINATED,接着调用termination.signalAll(),通知前面阻塞在awaitTermination的所有调用者线程。所以,TIDYING和TERMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。

final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 当workQueue为空,workCount为0时,执行下述代码。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将状态切换到TIDYING状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); // 调用钩子函数} finally {ctl.set(ctlOf(TERMINATED, 0)); // 将状态有TIDYING改为TERMINATEDtermination.signalAll(); // 通知awaitTermination(...)}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

2.3,任务提交过程

提交任务的方法如下:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果当前线程数小于corePoolSize,则启动新线程if (workerCountOf(c) < corePoolSize) {// 添加worker,并将command设置为worker线程的第一个任务开始执行if (addWorker(command, true))return;c = ctl.get();}// 如果当先的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求if (! isRunning(recheck) && remove(command))reject(command);// 放入队列中后发现没有线程执行任务,开启新线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 线程数大于maxPoolSize,并且队列已满,则调用拒绝策略else if (!addWorker(command, false))reject(command);
}
// 该方法用于启动新线程。如果第二个参数为true,则使用corePoolSize作为上线,否则使用maxPoolSize作为上线
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// 如果线程池状态值 起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空// 则添加worker失败,返回falseif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// 工作线程数达到上线,要么是corePoolSize要么是maximumPoolSize,启动线程失败if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 增加worker数量成功,返回到retry语句if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CASif (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}// worker数量加1成功后,接着运行boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 新建worker对象w = new Worker(firstTask);// 获取线程对象final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {// 由于线程已经在运行中,无法启动,抛异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将线程对应的worker加入worker集合workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {// 释放锁mainLock.unlock();}// 如果添加worker成功,则启动该worker对应的线程if (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果启动新线程失败if (! workerStarted)// workCount - 1addWorkerFailed(w);}return workerStarted;
}

2.4,任务执行过程

在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取出任务执行,这是一个不断循环的过程。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// 当前worker对象封装的线程final Thread thread;// 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务    Runnable firstTask;// 记录线程执行完成的任务数量,每个线程一个计数器   volatile long completedTasks;/*** 适应给定的第一个任务并利用线程工厂创建Worker实例* @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列获取任务*/Worker(Runnable firstTask) {setState(-1); // 线程处于阻塞状态,调用runWorker的时候中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 调用ThreadPoolExecutor的runWorker方法执行线程的运行   public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 中断Worker封装的线程w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。while (task != null || (task = getTask()) != null) {// 获取线程锁w.lock();// 如果线程池停止了,去报线程被中断// 如果线程池正在运行,确保线程不被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,给自己发中断信号wt.interrupt();try {// 任务执行之前的钩子方法,实现为空beforeExecute(wt, task);try {task.run();// 任务执行结束后的钩子方法,实现为空afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {// 任务执行完成,将task设置为nulltask = null;// 线程已完成的任务数加1w.completedTasks++;// 释放线程锁w.unlock();}}// 判断线程是否正常退出completedAbruptly = false;} finally {// Worker退出processWorkerExit(w, completedAbruptly);}}
}

2.5,线程池关闭过程

shutdown()与任务执行过程综合分析当调用shutdown()的时候,可能出现以下几种场景:

  • 当调用shutdown()的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。

  • 当调用shutdown的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样,退出while循环。

  • 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在getTask()方法的地方。空闲的这些线程和场景1一样处理,不空闲的线程会和场景2一样处理。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 如果线程池调用了shutdownNow(),返回null// 如果线程池调用了shutdown(),并且任务队列为空,也返回nullif (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {// 工作线程数减1decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间// 一旦中断。此处抛异常,对应上文场景1Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

 shutdownNow()与任务执行过程综合分析和上面的shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程的中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。当一个Worker最终退出的时候,会执行清理工作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果线程正常退出,不会执行if语句,这里一般是非正常退出,需要将worker数量减1if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 将自己的worker从集合移除workers.remove(w);} finally {mainLock.unlock();}// 每个线程在结束的时候都会用该方法,看是否可以停止线程池tryTerminate();int c = ctl.get();// 如果在线程池退出前,发现线程池还没有关闭if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果线程池中没有其他线程来,并且任务队列非空if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果工作线程数大于min,表示队列中的任务可以由其他线程执行,退出当前线程if (workerCountOf(c) >= min)return; // replacement not needed}// 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执行// 就再启动一个线程来处理addWorker(null, false);}
}

3,ScheduledThreadPoolExecutor

3.1,延迟执行和周期性执行原理

ScheduledThreadPoolExecutor实现了按时间调度来执行任务:

  • AtFixdRate:按固定频率执行,与任务本身运行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s。
  • WithFixedDelay:按固定间隔执行,与任务本身执行时间有关,例如本身执行时间是10s,间隔2s,则下一次开始执行的时间是12s。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
  • 延迟执行任务依靠的是DelayQueue。DelayQueue是BlockingQueue的一种,其实现原理是二叉堆。不过这里并没有没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue
  • 而周期性执行任务时执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

3.2,延迟执行

schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelaydWorkerQueue中。任务的执行过程还是复用ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。

/*** 此方法用于安排一个任务,在指定延迟后执行一次。它提供了一种非阻塞方式安排任务的方法,支持带有延迟的任务调度,* 对于实现定时器或延迟初始化等功能非常有用。* @param command 要执行的任务,表示为Runnable。如果任务为null,将抛出NullPointerException。* @param delay 任务开始执行前的延迟时间。如果延迟小于或等于零,任务将被立即安排执行。* @param unit 延迟参数的时间单位,例如SECONDS、MINUTES等。如果unit为null,将抛出NullPointerException。* @return 返回一个ScheduledFuture对象,可以用来取消任务或查询任务状态。*/
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();// 创建一个装饰后的任务,使用ScheduledFutureTask包装原始任务RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 执行延迟任务delayedExecute(t);return t;
}

传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(...)方法把Runnable包装成一个ScheduledFutureTask对象,而DelaydWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。

/*** 延迟或周期任务的主要执行方法。如果线程池已关闭,则拒绝任务。否则将任务添加到队列中,* 并在必要时启动一个线程来运行任务。(我们不能预先启动线程来运行任务,因为任务可能还没有准备好运行。)* 如果在线程池关闭期间添加任务,根据状态和关闭后运行参数,取消并移除任务。** @param task 要执行的任务*/
private void delayedExecute(RunnableScheduledFuture<?> task) {// 检查线程池是否处于关闭状态,如果是则拒绝任务if (isShutdown())reject(task);else {// 将任务添加到队列中super.getQueue().add(task);// 如果在线程池关闭期间添加任务,并且任务的执行状态不允许,// 根据状态和关闭后运行参数取消并移除任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) && remove(task))task.cancel(false);else// 确保启动一个线程来运行任务ensurePrestart();}
}
/*** 启动所有核心线程,使它们空闲等待任务。这将覆盖默认策略,即只有在执行新任务时才启动核心线程。** @return 返回启动的线程数量*/
public int prestartAllCoreThreads() {int n = 0;// 循环添加工作线程,直到无法再添加为止while (addWorker(null, true))++n;return n;
}

3.3,周期性执行

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// 创建一个新的 ScheduledFutureTask 实例ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 装饰任务,可能添加额外的行为RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// 延迟执行任务delayedExecute(t);return t;
}

和schedule(...)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkedQueue就结束了。两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个整数。

private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber;private volatile long time;private final long period;ScheduledFutureTask(Runnable r, V result, long triggerTime,long period, long sequenceNumber) {super(r, result);this.time = triggerTime; // 延迟时间this.period = period; // 周期this.sequenceNumber = sequenceNumber;}// 实现Delayed接口public long getDelay(TimeUnit unit) {return unit.convert(time - System.nanoTime(), NANOSECONDS);}// 实现Comparable接口public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;// 延迟时间相等,进一步比较序列号else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}// 实现Runnable接口public void run() {if (!canRunInCurrentRunState(this))cancel(false);// 如果不是周期执行,则执行一次else if (!isPeriodic())super.run();// 如果是周期执行,则重新设置下一次运行的时间,重新入队列else if (super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}// 下一次执行时间private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}
}// 下一次触发事件
long triggerTime(long delay) {return System.nanoTime() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}// 放到队列中,等待下一次执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(task)) {super.getQueue().add(task);if (canRunInCurrentRunState(task) || !remove(task)) {ensurePrestart();return;}}task.cancel(false);
}

withFixedDelay和atFixedRate的区别就体现在setNextRunTIme里面:

  • 如果是atFixedRate,period > 0,下一次开始执行时间等于上一次开始开始执行时间+period;
  • 如果是withFixedDelay,period < 0,下一次开始执行时间等于tiggerTime(-p),为now+(-period),now即上一次执行的结束时间。

http://www.mrgr.cn/news/61004.html

相关文章:

  • 管家婆财贸ERP BB055.销售检查可销库存+BB058.读取品牌销售数量
  • vue中elementUI的el-select下拉框的层级太高修改设置!
  • PFC前端电路
  • 基于centos7.9搭建MariaDB10.5高可用集群
  • 落地分享:来看 UFH AI 医疗大模型如何助力国际化诊疗场景
  • 推荐一款出色的图像查看器:Pineapple Pictures
  • 02 什么是Babel
  • 揭秘程序员薪资密码:10K 与 20K 的思维 “分水岭”
  • 【C++】多态与虚函数:深入理解对象的动态行为(万字长文详解)
  • 设计资讯 | 塑造数字交互未来的 Sol Reader
  • 快捷回复软件助力客服高效工作
  • 基于SSM(spring+springmvc+mybatis)+MySQL开发的新闻推荐系统
  • 用低配置的轻薄本玩《黑神话》是一种什么样的体验?
  • Sci Adv项目文章|ChIP-seq助力解析巨噬细胞关键调节因子AhR在黑色素瘤的进展和免疫治疗的耐药性作用
  • 苏打水奋斗过非与7656要
  • 《SMO算法 公式推导》拉格朗日乘子上界和下界
  • 什么是POJO类?
  • 关于InternVL2的环境安装
  • 等级保护测评与风险评估:企业信息安全的双剑合璧
  • vue底层原理