当前位置: 首页 > news >正文

Java Review - 线程池原理源码解析

文章目录

  • Pre
  • 为什么要用线程池
  • 线程池的优点
    • (1)重复利用线程
    • (2)控制线程的数量
  • 线程池实现原理
  • 线程池ThreadPoolExecutor
    • 类关系
    • 线程池的工作流程
    • 任务队列
    • 空闲线程的存活时间
    • 参数ThreadFactory
    • 拒绝策略
      • 被拒绝后的任务如何再次执行
    • 向线程池提交任务
      • execute
      • submit
    • 关闭线程池
      • shutdown
      • shutdownNow
      • isShutdown
      • isTerminated
  • ThreadPoolExecutor源码解析
    • 线程池的5种状态
    • 工作线程workers
    • execute提交任务
    • addWorker创建并执行工作线程
      • Worker类
      • runWorker方法
      • getTask() 从任务队列中获取下一个任务
      • processWorkerExit
    • 关闭线程池
      • shutdown
      • shutdownNow

在这里插入图片描述


Pre

每日一博 - Review线程池

每日一博 - Review线程池_02

异步编程 - 02 显式使用线程和线程池实现异步编程

并发编程-23J.U.C组件拓展之阻塞队列BlockingQueue 和 线程池

Java-Java中的线程池原理分析及使用

J.U.C Review - 线程池原理/源码解析


为什么要用线程池

当我们需要执行一个任务时,可以直接使用new Thread创建一个线程来运行任务。

线程从创建到销毁大概经历了以下步骤:

  • 1)创建Java线程实例,线程是一个对象实例,会在堆中分配内存。创建线程需要时间和内存。
  • 2)JVM为线程创建其私有资源:虚拟机栈和程序计数器。
  • 3)执行start方法启动Java线程,操作系统为Java线程创建对应的内核线程,线程处于就绪状态。内核线程属于操作系统资源,创建也需要时间和内存
  • 4)线程被操作系统CPU调度器选中后,线程开始运行任务。
  • 5)线程在运行过程中,会被CPU不断切换运行。
  • 6)线程运行完毕,Java线程被垃圾回收器回收
App JThread JVM OS CPU App JThread OS 创建Java线程实例 请求创建私有资源 创建虚拟机栈和程序计数器 调用start方法 创建内核线程 将线程置于就绪队列 调度线程运行 切换线程 loop [线程运行期间] 任务完成 通知垃圾回收器 回收线程资源 App JThread JVM OS CPU App JThread OS

从线程的执行流程来看,可以得知:

  • 1)线程不仅是Java对象,更是操作系统资源,创建线程和销毁线程,都需要时间。频繁的创建、销毁线程,会很大程度上影响处理效率。例如:创建线程花费时间T1,执行任务花费时间T2,销毁线程花费时间T3。如果T1+T3>T2,就很不划算
  • 2)Java线程的创建和运行需要占用内存空间,线程数量一大,会消耗很多内存。线程不仅需要在堆中开辟空间,还需要为每个线程分配虚拟机栈和程序计数器。根据JVM规范,一个线程默认最大栈大小是1M,这个栈空间需要从内存中分配的。
  • 3)CPU切换上下文时需要时间,线程数量多时,CPU会频繁切换线程上下文,会影响系统性能。
CPU切换
内存消耗
线程资源
频繁上下文切换
系统性能下降
堆内存
每个线程默认最大栈大小1M
Java对象
操作系统资源
内存空间
虚拟机栈
程序计数器
创建线程
时间T1
执行任务
时间T2
销毁线程
时间T3

单CPU上同时只能运行一个线程,CPU通过切换上下文运行线程,实现多线程的并发。

所以说,线程并不是越多越好,线程数量和系统性能是一种抛物线的关系,当线程数量达到某个数值的时候,性能反而会降低很多,因此对线程的管理,尤其是数量的控制能直接决定程序的性能。对线程的重复利用是非常有必要的。

为了解决这些问题,所以产生了线程池。线程池的主要目的就是为了控制线程的数量,重复利用线程,提高执行效率。


线程池的优点

线程池的优点具体如下:

(1)重复利用线程

  • 1)可以复用线程,降低了创建和销毁的性能开销。当线程数量一大,线程的创建和销毁的开销是巨大的。使用线程池,每当有新任务要执行时,可以复用已有的工作线程,大大减少了不必要的开销,这些开销包括内存开销和时间开销。
  • 2)提升任务的响应速度,当有新任务需要执行时不需创建线程可以立即执行。当有新任务需要执行时不必创建新线程,可以使用线程池中的工作线程立即执行。这样就减少了创建线程的时间消耗,减少了任务执行时间,提升了任务的响应速度。

(2)控制线程的数量

  • 1)可以根据系统承受能力,通过合理的控制线程数,防止线程数过多导致服务崩溃。

    内存:当线程数量一大,线程的创建和运行消耗内存是巨大的,甚至有可能超过服务器的承受范围,导致的内存溢出问题。根据系统承受能力,合理的控制线程数,就可以防止这种情况发生。

    CPU:CPU切换线程也是需要时间的,当线程数量过多时,CPU会频繁切换线程上下文,这个时间消耗也是不容忽视的。可以通过控制线程池的最大线程数,避免大量的线程池争夺CPU资源而造成的性能消耗。

  • 2)线程池可以对线程进行统一的管理,支持更多的功能. 比如,线程池可以根据任务执行情况,动态的调整线程池中的工作线程的数量。当任务比较少时自动回收线程,当线程不够用时则新建。使用线程池可以进行统一分配、调优和监控。


线程池实现原理

所谓线程池,通俗的理解即一个容器,里面存放着提前创建好的若干个线程,当有任务提交给线程池执行时,任务会被分配给容器中的某个线程来执行。

任务执行完毕后,这个线程不会被销毁而是重新等待分配任务。

线程池有一个任务队列,缓存着异步提交待处理的任务。当任务过多超过了任务队列的容量时,线程池会自动扩充新的线程到池子中,但是最大线程数量是有上限的。同时当任务比较少的时候,池子中的线程还能够自动回收和释放资源。

线程池
容器中存放多个线程
任务提交给线程池
任务分配给线程执行
任务执行完毕
线程重新等待任务
任务队列
缓存待处理任务
任务超过队列容量
线程池扩充新线程
最大线程数量有限制
任务较少时
线程池回收线程
释放资源

由此可知,线程池应该具备如下要素:

  • 1)线程池管理器:用于创建并控制线程数量,包括创建线程和销毁线程;
  • 2)工作线程:线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  • 3)任务队列:用于缓存提交的任务。线程池提供了一种缓冲机制,用于处理用户提交的,还没有来得及处理的任务,一般是有数量限制的。
  • 4)任务拒绝策略:如果任务队列已满且线程数已达到上限,则需要有相应的拒绝策略来处理后续任务
线程池要素
线程池管理器
创建线程
销毁线程
控制线程数量
工作线程
等待任务
执行任务
循环执行
任务队列
缓存提交的任务
缓冲机制
有数量限制
任务拒绝策略
任务队列已满
线程数达上限
处理后续任务

线程池ThreadPoolExecutor

类关系

Java线程池中标准的线程池是ThreadPoolExecutor。该线程池的接口是ExecutorExecutorService

在这里插入图片描述

  • Executor是最上层接口,只有一个方法execute,用于执行提交任务Runnable实例。

  • ExecutorService继承自Executor,定义了线程池的主要接口,拓展了任务执行的Callable方式,以及生命周期相关的方法,如关闭线程池shutdown。

    ExecutorService的生命周期有三种状态:运行、关闭、终止。其中生命周期关闭和终止相关的主要方法如下
    在这里插入图片描述


ThreadPoolExecutor继承了AbstractExecutorServiceAbstractExecutorService实现了任务执行的Callable方式。

ThreadPoolExecutor线程池有4个构造函数,下面基于它最完整的构造函数来讲解下每个参数的作用,构造函数代码如下所示

在这里插入图片描述
参数说明如下:

  • 1)corePoolSize:核心线程数。
  • 2)maximumPoolSize:最大线程数。
  • 3)workQueue:任务队列,缓存已经提交但尚未被执行的任务。
  • 4)keepAliveTime:空闲线程的存活时间。
  • 5)unit:keepAliveTime的单位。
  • 6)threadFactory:线程工厂(用于指定如何创建一个线程)​。
  • 7)handler:拒绝策略(工作队列已满且线程池中线程已达上限时的处理策略)​。
ThreadPoolExecutor
AbstractExecutorService
ExecutorService
Executor
构造函数参数
corePoolSize: 核心线程数
maximumPoolSize: 最大线程数
workQueue: 任务队列
keepAliveTime: 空闲线程存活时间
unit: keepAliveTime的单位
threadFactory: 线程工厂
handler: 拒绝策略
任务队列
缓存已提交但未执行的任务
线程工厂
指定如何创建线程
拒绝策略
工作队列已满且线程池中线程已达上限时的处理策略

线程池的工作流程

线程池刚被创建时,线程池中的线程数默认为0。当向线程池提交一个任务时,线程池的工作流程如下

  • 1)如果当前线程数<corePoolSize,则创建新的线程并执行该任务。 当有新任务需要执行时,如果当前线程数<corePoolSize,即使这时候有空闲线程,也会创建新线程执行任务。
  • 2)如果当前线程数>=corePoolSize且任务队列未满,则将任务存入任务队列中。等待线池中有空闲线程时,就会执行任务队列中的任务。
  • 3)如果任务队列已满,且当前线程数<maximumPoolSize,则新建线程执行该任务。
  • 4)如果阻塞队列已满,且当前线程数=maximumPoolSize,则执行拒绝策略,通知任务调用者线程池不再接受任务了。
线程池工作流程
当前线程数 < corePoolSize
创建新线程执行任务
当前线程数 = corePoolSize
任务队列未满
将任务存入任务队列
等待空闲线程执行任务
任务队列已满
当前线程数 < maximumPoolSize
创建新线程执行任务
当前线程数 = maximumPoolSize
执行拒绝策略
通知任务调用者线程池不再接受任务

线程池刚被创建时,是不会创建线程的,当有新任务需要执行时,线程池才会创建线程。线程在完成任务以后,线程是不会立即销毁的,线程会先查看任务队列,如果任务队列有等待执行的任务,线程会继续执行队列中的任务。如果这时没有任务需要执行,线程会主动挂起自己,当有新任务需要执行时,线程会被唤醒开始执行任务。

这样当有大量任务需要执行时,既节省了创建线程的性能损耗,也可以反复重用同一线程,节约大量性能开销.

线程池工作流程
线程池刚被创建
线程池中没有线程
有新任务需要执行
当前线程数 < corePoolSize
创建新线程执行任务
当前线程数 = corePoolSize
任务队列未满
将任务存入任务队列
等待空闲线程执行任务
任务队列已满
当前线程数 < maximumPoolSize
创建新线程执行任务
当前线程数 = maximumPoolSize
执行拒绝策略
通知任务调用者线程池不再接受任务
任务执行完毕
检查任务队列
任务队列有任务
继续执行任务队列中的任务
任务队列无任务
线程挂起
有新任务需要执行
线程被唤醒执行任务

任务队列

其中参数workQueue,是一个阻塞队列BlockingQueue,用于缓存待执行的任务。可以选择以下几种阻塞队列

  • 1)ArrayBlockingQueue:一个数组结构的有界阻塞队列,使用时需指定数量,先进先出的
  • 2)LinkedBlockingQueue:一个链表结构的阻塞队列,可指定最大容量,如果不指定容量默认为Integer.MAX_VALUE,该队列为先进先出的。线程池工厂Executors中的newFixedThreadPool线程池就使用了这种队列。
  • 3)SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作会一直处于阻塞状态。若在线程池中使用此队列,若有新任务时会直接新建一个线程来执行新任务。线程池工厂Executors中的newCachedThreadPool线程池就使用了这种队列。
  • 4)PriorityBlockingQueue:一个具有优先级的无限阻塞队列,真正的无界队列,按照元素权重出队。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。但是注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行
阻塞队列 BlockingQueue
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
数组结构
有界队列
先进先出
需指定数量
链表结构
可指定最大容量
默认容量 Integer.MAX_VALUE
先进先出
用于 newFixedThreadPool
不存储元素
插入操作阻塞
移除操作唤醒
用于 newCachedThreadPool
无限队列
优先级排序
高优先级任务先执行
可能低优先级任务永远不执行

空闲线程的存活时间

空闲线程的存活时间的参数有两个:

  • 参数keepAliveTime:空闲线程的存活时间。
  • 参数unit:keepAliveTime的单位。这两个参数的单位一般使用秒或者毫秒就够了。在TimeUnit类中,可选的单位有天(DAYS)​、小时(HOURS)​、分钟(MINUTES)​、秒(SECONDS)​、毫秒(MILLISECONDS)​、微秒(MICROSECONDS ,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)​。

这两个参数表示线程池的工作线程空闲后,其保持存活的时间。在默认情况下,只会回收非核心线程,核心线程是不回收的,但在设置了核心线程可回收后,核心线程空闲时间达到回收条件时也会被回收

allowCoreThreadTimeOut(true)
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(), // workQueueExecutors.defaultThreadFactory(), // threadFactorynew ThreadPoolExecutor.AbortPolicy() // handler
);// 允许核心线程超时并终止
executor.allowCoreThreadTimeOut(true);

如果任务很多,并且每个任务执行的时间比较短,可以调长时间,提高线程的利用率 .


参数ThreadFactory

用于创建线程池中线程的工厂,线程池默认是使用的是Executors中的DefaultThreadFactory,线程格式为pool-{线程池id}-thread-{线程id}。

可以通过自定义线程工厂类给创建出来的线程设置更有意义的名字,方便出错时回溯,对故障定位。

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;public class CustomThreadFactory implements ThreadFactory {private final String poolName;private final AtomicInteger threadId = new AtomicInteger(1);public CustomThreadFactory(String poolName) {this.poolName = poolName;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, poolName + "-thread-" + threadId.getAndIncrement());t.setDaemon(false); // 设置为非守护线程t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级return t;}
}

如果觉得实现麻烦,还可以使用开源框架guava提供的ThreadFactoryBuilder,它可以给线程池里的线程设置有意义的名字

import com.google.common.util.concurrent.ThreadFactoryBuilder;public class GuavaThreadFactoryExample {public static void main(String[] args) {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").setDaemon(true) // 设置为守护线程.setPriority(Thread.MAX_PRIORITY) // 设置线程优先级.build();ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(), // workQueuethreadFactory, // threadFactorynew ThreadPoolExecutor.AbortPolicy() // handler);// 提交任务executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("任务执行中...");}});}
}

拒绝策略

当线程池中任务队列和线程都满时,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

在JDK中Java提供了以下4种拒绝策略。

  • AbortPolicy:新任务直接被拒绝,并抛出异常RejectedExecutionException。
  • DiscardPolicy:新任务忽略不执行,丢弃。
  • DiscardOldestPolicy:抛弃任务队列中等待最久的任务,将新任务添加到等待队列中。
  • CallerRunPolicy:新任务使用调用者所在线程来执行任务。
拒绝策略
AbortPolicy
DiscardPolicy
DiscardOldestPolicy
CallerRunPolicy
新任务直接被拒绝
抛出 RejectedExecutionException 异常
新任务忽略不执行
丢弃
抛弃任务队列中等待最久的任务
将新任务添加到等待队列中
新任务使用调用者所在线程来执行任务

当然,也可以根据应用场景需要,来实现RejectedExecutionHandler接口自定义策略。比如记录日志或持久化存储不能处理的任务。可以使用两个方法向线程池提交任务,分别为execute和submit方法。

自定义拒绝策略
实现 RejectedExecutionHandler 接口
记录日志
持久化存储任务
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 记录日志System.err.println("Task " + r.toString() + " is rejected");// 持久化存储任务persistTask(r);}private void persistTask(Runnable r) {// 实现任务持久化逻辑System.out.println("Persisting task: " + r.toString());}
}

被拒绝后的任务如何再次执行

  1. 自定义拒绝策略

首先,实现 RejectedExecutionHandler 接口,定义自定义的拒绝策略。

自定义拒绝策略
实现 RejectedExecutionHandler 接口
任务被拒绝时
记录日志
调用 persistTask 方法
将任务持久化
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 记录日志System.err.println("Task " + r.toString() + " is rejected");// 持久化存储任务persistTask(r);}private void persistTask(Runnable r) {// 实现任务持久化逻辑System.out.println("Persisting task: " + r.toString());// 假设任务被持久化到一个文件或数据库中// 这里只是一个简单的示例,实际应用中需要实现具体的持久化逻辑TaskStorage.storeTask(r);}
}

  1. 任务存储类

定义一个任务存储类 TaskStorage,用于模拟任务的持久化和读取。

任务存储类 TaskStorage
storeTask 方法
将任务添加到存储列表中
getStoredTasks 方法
返回存储的任务列表
clearTasks 方法
清空存储的任务列表
import java.util.ArrayList;
import java.util.List;public class TaskStorage {private static List<Runnable> storedTasks = new ArrayList<>();public static void storeTask(Runnable task) {storedTasks.add(task);}public static List<Runnable> getStoredTasks() {return new ArrayList<>(storedTasks);}public static void clearTasks() {storedTasks.clear();}
}

  1. 重新提交任务

在适当的时候,从任务存储中读取任务并重新提交到线程池中。

MainThread ThreadPoolExecutor TaskStorage 创建线程池 提交任务 loop [提交任务] 等待一段时间 从任务存储中读取任务 返回存储的任务列表 重新提交任务 loop [重新提交任务] 关闭线程池 MainThread ThreadPoolExecutor TaskStorage
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExample {public static void main(String[] args) {// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(10), // workQueueExecutors.defaultThreadFactory(), // threadFactorynew CustomRejectedExecutionHandler() // handler);// 提交一些任务for (int i = 0; i < 20; i++) {int taskId = i;executor.execute(() -> {System.out.println("Task " + taskId + " is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}// 等待一段时间,确保所有任务都处理完毕try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}// 从任务存储中读取任务并重新提交List<Runnable> storedTasks = TaskStorage.getStoredTasks();if (!storedTasks.isEmpty()) {System.out.println("Re-submitting stored tasks...");for (Runnable task : storedTasks) {executor.execute(task);}}// 关闭线程池executor.shutdown();}
}

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute和submit方法。

提交任务的方法
execute
submit
提交 Runnable 任务
任务没有返回值
提交 Runnable 任务
返回 Future
提交 Runnable 任务, 返回结果
返回 Future
提交 Callable 任务
返回 Future

execute

(1)execute方法ThreadPoolExecutor中的方法execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute传入的任务是一个Runnable类的实例

execute 方法
提交不需要返回值的任务
传入的任务是一个 Runnable 类的实例
无法判断任务是否被线程池执行成功

submit

(2)submit方法submit相关的方法有三种,用于提交需要返回值的任务。任务提交后,线程池会立即返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过它的get方法来获取任务的计算结果。执行get方法会阻塞当前线程直到任务完成,而使用get(longtimeout,TimeUnit unit)方法则会阻塞当前线程一段时间后返回,这时候有可能任务没有执行完。

submit 方法:

  • 提交需要返回值的任务:submit 方法用于提交需要返回值的任务。
  • submit(Runnable task):提交一个 Runnable 任务,返回 Future<?> 对象。
  • submit(Runnable task, T result):提交一个 Runnable 任务,并指定任务完成后的结果,返回 Future<T> 对象。
  • submit(Callable<T> task):提交一个 Callable 任务,返回 Future<T> 对象。

Future 对象:

  • 通过 get() 方法获取结果:调用 get() 方法获取任务的计算结果。

  • 阻塞当前线程直到任务完成:调用 get() 方法会阻塞当前线程,直到任务完成。

  • get(long timeout, TimeUnit unit) 方法:调用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后返回,如果在这段时间内任务没有完成,可能会抛出 TimeoutException。

  • 阻塞当前线程一段时间后返回:调用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后返回,如果任务在这段时间内没有完成,可能会抛出TimeoutException。


关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池

shutdown

  • 线程池不再接收新的任务:调用 shutdown() 方法后,线程池不再接收新的任务。
  • 所有任务(包括正在执行的和等待队列中的)完成后,线程池关闭:线程池会等待所有任务(包括正在执行的和等待队列中的)完成后,才会关闭。

shutdownNow

  • 线程池不再接收新的任务:调用 shutdownNow() 方法后,线程池不再接收新的任务。
  • 等待队列中的任务将不会被执行:等待队列中的任务将不会被执行。
  • 尝试中断正在运行的线程:线程池会尝试中断正在运行的线程。
  • 无法响应中断的任务无法终止:如果任务无法响应中断,它们将无法终止。
  • 只能等待任务执行完毕后,才能关闭线程池:对于无法响应中断的任务,只能等待它们执行完毕后,线程池才能关闭。

isShutdown

返回 true:只要调用了 shutdown() 或 shutdownNow() 方法,isShutdown 方法就会返回 true。

isTerminated

返回 true:当所有的任务都已关闭后,isTerminated 方法会返回 true。

使用建议

  • 通常调用 shutdown 方法来关闭线程池:如果任务重要且需要执行完,建议使用 shutdown 方法。
  • 如果任务不重要不一定要执行完,则可以调用 shutdownNow 方法:如果任务不重要或可以中断,可以使用 shutdownNow 方法。

ThreadPoolExecutor源码解析

线程池的5种状态

工作线程workers

execute提交任务

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** 执行过程分为三个步骤:** 1. 如果当前运行的线程数少于核心线程数,尝试启动一个新的线程,*    并将给定的命令作为新线程的第一个任务。调用 addWorker 方法会原子地检查 runState 和 workerCount,*    从而防止在不应该添加线程时错误地添加线程,通过返回 false 来避免这种情况。** 2. 如果任务可以成功入队,我们仍然需要双重检查是否应该添加线程(因为自上次检查以来现有线程可能已经死亡)*    或者线程池是否在进入此方法后已关闭。因此,我们需要重新检查状态,如果线程池已停止,则回滚入队操作;*    如果没有线程在运行,则启动一个新线程。** 3. 如果任务无法入队,我们尝试添加一个新线程。如果失败,我们知道线程池已关闭或已饱和,*    因此拒绝该任务。*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 当前工作线程数少于核心线程数if (addWorker(command, true))return; // 成功添加工作线程,直接返回c = ctl.get(); // 重新获取控制状态}if (isRunning(c) && workQueue.offer(command)) {// 线程池正在运行且任务成功入队int recheck = ctl.get(); // 重新获取控制状态if (!isRunning(recheck) && remove(command)) {// 线程池已停止且任务已从队列中移除reject(command); // 拒绝任务} else if (workerCountOf(recheck) == 0) {// 重新检查工作线程数是否为0addWorker(null, false); // 添加一个空闲工作线程}} else if (!addWorker(command, false)) {// 任务无法入队且无法添加新线程reject(command); // 拒绝任务}
}
任务为空
任务不为空
成功
失败
线程池不在运行
线程池在运行
成功
失败
execute 方法
检查任务是否为空
抛出 NullPointerException
获取控制状态 c
当前工作线程数小于核心线程数
尝试添加工作线程
返回
重新获取控制状态 c
检查线程池是否在运行
将任务加入工作队列
重新获取控制状态 recheck
从队列移除任务
拒绝任务
检查工作线程数是否为0
添加空闲工作线程
返回
尝试添加工作线程
返回
拒绝任务

addWorker创建并执行工作线程

RUNNING
SHUTDOWN 且 firstTask 不为 null
STOP
core 为 true 且 wc < corePoolSize
core 为 false 且 wc < maximumPoolSize
wc >= CAPACITY
CAS 成功
CAS 失败
启动成功
启动失败
addWorker 方法
获取初始状态
检查线程池状态
继续执行
返回 false
返回 false
检查工作线程数
尝试 CAS 增加工作线程数
返回 false
创建并启动工作线程
重新获取控制状态并重试
创建 Worker 对象
创建新线程
启动新线程
返回 true
回滚工作线程数并返回 false
/*** 检查根据当前线程池状态和给定的边界(核心或最大),是否可以添加新的工作线程。如果可以,相应地调整工作线程的数量,* 并且如果可能,创建并启动一个新的工作线程,运行 `firstTask` 作为其第一个任务。如果线程池已停止或符合关闭条件,* 或者线程工厂无法创建线程时,此方法返回 false。如果线程创建失败(通常是由于线程工厂返回 null 或者在 `Thread.start()` 时抛出异常,如 `OutOfMemoryError`),* 我们会干净地回滚。** @param firstTask 新线程应首先运行的任务(如果没有则为 null)。当线程池中的线程少于核心线程数时(在这种情况下我们总是启动一个新线程),*                  或者当队列已满时(在这种情况下我们必须绕过队列),工作线程会在 `execute()` 方法中创建并带有初始的第一个任务。*                  初始空闲线程通常通过 `prestartCoreThread` 创建,或者用于替换其他即将死亡的工作线程。** @param core 如果为 true,则使用核心线程数作为边界,否则使用最大线程数。这里使用布尔指示器而不是值,以确保在检查其他线程池状态后读取最新的值。* @return 如果成功则返回 true*/
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 必要时检查队列是否为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // 重新读取 ctlif (runStateOf(c) != rs)continue retry;// 否则,CAS 失败是因为工作线程数量发生变化;重试内部循环}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 持有锁时重新检查// 如果线程工厂失败或在获取锁之前已关闭,则回退int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 预检查 t 是否可启动throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

Worker类

addWorker方法只是构造了一个Worker,并且把firstTask封装到Worker中。

每个Worker都是一条线程,包含了一个firstTask初始化时要被首先执行的任务。最终执行任务的是runWorker()方法。

属性

  • Worker类继承了AQS,并实现了Runnable接口。

  • firstTask字段用来保存传入的任务;表示 Worker 被创建时需要运行的第一个任务。如果为 null,则表示 Worker 是空闲的

  • thread字段,是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。表示实际的工作线程。每个 Worker 实例都对应一个 Thread 实例

  • volatile long completedTasks: 记录该 Worker 已完成的任务数量。这是一个 volatile 变量,确保多线程环境下的可见性和有序性。

构造方法

在调用构造方法时,需要传入任务,这里通过getThreadFactory(). newThread(this)来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。


runWorker方法

在 run 方法中,调用 runWorker(this) 方法来执行任务

public void run() {runWorker(this);
}
/*** 主工作线程循环。重复从队列中获取任务并执行它们,同时处理多个问题:** 1. 我们可能从一个初始任务开始,在这种情况下,我们不需要获取第一个任务。否则,只要线程池在运行,我们就从 getTask 获取任务。*    如果 getTask 返回 null,则由于线程池状态或配置参数的变化,工作线程退出。其他退出情况是由于外部代码抛出异常,此时 completedAbruptly 为 true,*    通常会导致 processWorkerExit 替换这个线程。** 2. 在运行任何任务之前,获取锁以防止任务执行期间其他线程中断,然后确保除非线程池正在停止,否则该线程不会被中断。** 3. 每个任务运行之前都会调用 beforeExecute,可能会抛出异常,这种情况下我们会使线程终止(通过设置 completedAbruptly 为 true 退出循环)而不处理任务。** 4. 假设 beforeExecute 正常完成,我们运行任务,并收集其抛出的所有异常以传递给 afterExecute。*    我们分别处理 RuntimeException、Error(规范保证我们捕获这两种异常)和任意的 Throwable。*    由于我们不能在 Runnable.run 中重新抛出 Throwable,我们在退出时将其包装在 Error 中(传递给线程的 UncaughtExceptionHandler)。*    任何抛出的异常也会保守地导致线程终止。** 5. 在 task.run 完成后,我们调用 afterExecute,它也可能抛出异常,这同样会导致线程终止。*    根据 JLS 第 14.20 节,即使 task.run 抛出异常,这个异常也是有效的。** 异常机制的总体效果是,afterExecute 和线程的 UncaughtExceptionHandler 可以获得关于用户代码遇到的问题的最准确信息。** @param w 工作线程*/
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 如果线程池正在停止,确保线程被中断;// 否则,确保线程不被中断。这需要在第二种情况下重新检查,以处理清除中断时的 shutdownNow 竞争条件if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

初始化:

  • 使用 retry 标签开始一个无限循环。
  • 获取当前控制状态 c 和运行状态 rs。

检查线程池状态:

  • 如果线程池状态大于等于 SHUTDOWN,并且不符合特定条件(即线程池处于 SHUTDOWN 状态且没有初始任务且队列不为空),则返回 false。

检查工作线程数量:

  • 获取当前工作线程数量 wc。
  • 如果工作线程数量达到容量上限或超过核心/最大线程数,则返回 false。
  • 使用 compareAndIncrementWorkerCount 尝试增加工作线程数量,如果成功则跳出 retry 循环。
  • 如果 compareAndIncrementWorkerCount 失败,重新读取控制状态 c 并检查运行状态 rs,如果运行状态发生变化则继续 retry 循环。

创建和启动工作线程:

  • 尝试创建一个新的 Worker 对象 w。
  • 获取 Worker 的线程 t。
  • 如果线程 t 不为 null,则持有主锁 mainLock。
  • 重新检查线程池状态,如果线程池未停止或处于 SHUTDOWN 状态且没有初始任务,则检查线程是否已启动。
  • 将 Worker 添加到工作线程列表 workers,更新最大线程数 largestPoolSize,并设置 workerAdded 为 true。
  • 释放主锁 mainLock。
  • 如果 workerAdded 为 true,启动线程 t 并设置 workerStarted 为 true。

异常处理:

  • 如果 workerStarted 为 false,调用 addWorkerFailed 方法进行回滚处理。

返回结果:

  • 返回 workerStarted,表示是否成功启动了新的工作线程

getTask() 从任务队列中获取下一个任务

/*** 根据当前配置设置执行阻塞或定时等待任务,或者在以下情况下返回 null:* 1. 工作线程数超过 maximumPoolSize(由于调用了 setMaximumPoolSize)。* 2. 线程池已停止。* 3. 线程池已关闭且任务队列为空。* 4. 此工作线程在等待任务时超时,并且超时的工作线程将被终止(即,*    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})*    在定时等待前后,如果队列不为空,则此工作线程不是池中的最后一个线程。** @return 任务,或者如果工作线程必须退出,则返回 null,在这种情况下 workerCount 将递减*/
private Runnable getTask() {boolean timedOut = false; // 上一次 poll() 是否超时?for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果必要,检查队列是否为空。if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 工作线程是否可以被裁剪?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

processWorkerExit

开始
是否 completedAbruptly
减少工作线程数
获取主锁
增加已完成任务计数
从工作线程集合中移除该工作线程
释放主锁
尝试终止线程池
获取当前状态
线程池状态 < STOP
结束
是否 completedAbruptly
计算最小工作线程数
添加新的工作线程
最小工作线程数 == 0 且 队列不为空
当前工作线程数 >= 最小工作线程数
结束
最小工作线程数设为 1
/*** 执行即将死亡的工作线程的清理和账簿记录。仅从工作线程调用。除非设置了 completedAbruptly,* 否则假设 workerCount 已经调整以反映退出。此方法从工作线程集合中移除线程,* 并可能终止线程池或替换工作线程,如果它因用户任务异常退出,或者运行中的工作线程少于* corePoolSize,或者队列不为空但没有工作线程。** @param w 即将退出的工作线程* @param completedAbruptly 如果工作线程因用户异常而死亡*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果突然退出,则 workerCount 没有调整decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 增加已完成任务计数workers.remove(w); // 从工作线程集合中移除该工作线程} finally {mainLock.unlock();}tryTerminate(); // 尝试终止线程池int c = ctl.get();if (runStateLessThan(c, STOP)) { // 如果线程池状态小于 STOPif (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 计算最小工作线程数if (min == 0 && !workQueue.isEmpty())min = 1; // 如果最小工作线程数为 0 且队列不为空,则最小工作线程数设为 1if (workerCountOf(c) >= min)return; // 如果当前工作线程数大于等于最小工作线程数,则不需要替换}addWorker(null, false); // 添加新的工作线程}
}

关闭线程池

shutdown

/*** 开始有序关闭,在此过程中之前提交的任务将继续执行,但不再接受新任务。* 如果已经关闭,则调用此方法不会产生额外效果。** <p>此方法不会等待之前提交的任务完成执行。使用 {@link #awaitTermination awaitTermination} 来实现这一点。** @throws SecurityException {@inheritDoc}*/
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查关闭权限advanceRunState(SHUTDOWN); // 将运行状态推进到 SHUTDOWNinterruptIdleWorkers(); // 中断空闲的工作线程onShutdown(); // 为 ScheduledThreadPoolExecutor 提供的钩子方法} finally {mainLock.unlock(); // 释放主锁}tryTerminate(); // 尝试终止线程池
}
开始
获取主锁
检查关闭权限
将运行状态推进到 SHUTDOWN
中断空闲的工作线程
调用 onShutdown 钩子方法
释放主锁
尝试终止线程池
结束

shutdownNow

/*** 尝试停止所有正在执行的任务,停止处理等待中的任务,并返回一个包含等待执行任务的列表。* 这些任务在方法返回时将从任务队列中被移除。** <p>此方法不会等待正在执行的任务终止。使用 {@link #awaitTermination awaitTermination} 来实现这一点。** <p>除了尽力尝试停止正在执行的任务外,没有其他保证。此实现通过 {@link Thread#interrupt} 取消任务,* 因此任何不响应中断的任务可能永远不会终止。** @throws SecurityException {@inheritDoc}*/
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查关闭权限advanceRunState(STOP); // 将运行状态推进到 STOPinterruptWorkers(); // 中断所有工作线程tasks = drainQueue(); // 从任务队列中移除并返回等待的任务} finally {mainLock.unlock(); // 释放主锁}tryTerminate(); // 尝试终止线程池return tasks; // 返回等待的任务列表
}
开始
获取主锁
检查关闭权限
将运行状态推进到 STOP
中断所有工作线程
从任务队列中移除并返回等待的任务
释放主锁
尝试终止线程池
返回等待的任务列表
结束

在这里插入图片描述


http://www.mrgr.cn/news/71925.html

相关文章:

  • 为什么transformer的时间复杂度是N的平方,具体是里面的哪一个计算流程最占用时间
  • 知识图谱介绍
  • php:使用socket函数创建WebSocket服务
  • 问题定位学习
  • Seatunnel解决Excel中无法将数字类型转换成字符串类型以及源码打包
  • 2024/11/20学习日志
  • Ubuntu linux 命令总结
  • 如何理解DDoS安全防护在企业安全防护中的作用
  • 聊聊Flink:Flink的运行时架构
  • 几何合理的分片段感知的3D分子生成 FragGen - 评测
  • WebStorm 如何调试 Vue 项目
  • C++基础(12.红黑树实现)
  • [运维][Nginx]Nginx学习(2/5)-Nginx高级
  • 241112
  • 【Linux】————信号
  • java数据结构与算法:栈
  • 用户,组管理命令
  • 高情商的人都在用的处事细节和技巧
  • 人工智能助手是否让程序员技能退化?
  • Java多线程进阶(锁策略)
  • python 由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作
  • 政务数据治理专栏开搞!
  • 时间空间频域融合的Corssformer时间序列预测项目
  • Fortran安装(vscode+gcc+Python)
  • Django Form
  • JVM——类加载器、类加载器的分类