Java Review - 线程池原理源码解析
文章目录
- Pre
- 为什么要用线程池
- 线程池的优点
- (1)重复利用线程
- (2)控制线程的数量
- 线程池实现原理
- 线程池ThreadPoolExecutor
- 类关系
- 线程池的工作流程
- 任务队列
- 空闲线程的存活时间
- 参数ThreadFactory
- 拒绝策略
- 被拒绝后的任务如何再次执行
- 向线程池提交任务
- execute
- submit
- 关闭线程池
- shutdown
- shutdownNow
- isShutdown
- isTerminated
- ThreadPoolExecutor源码解析
- 线程池的5种状态
- 工作线程workers
- execute提交任务
- addWorker创建并执行工作线程
- Worker类
- runWorker方法
- getTask() 从任务队列中获取下一个任务
- processWorkerExit
- 关闭线程池
- shutdown
- shutdownNow
Pre
每日一博 - Review线程池
每日一博 - Review线程池_02
异步编程 - 02 显式使用线程和线程池实现异步编程
并发编程-23J.U.C组件拓展之阻塞队列BlockingQueue 和 线程池
Java-Java中的线程池原理分析及使用
J.U.C Review - 线程池原理/源码解析
为什么要用线程池
当我们需要执行一个任务时,可以直接使用new Thread创建一个线程来运行任务。
线程从创建到销毁大概经历了以下步骤:
- 1)创建Java线程实例,线程是一个对象实例,会在堆中分配内存。创建线程需要时间和内存。
- 2)JVM为线程创建其私有资源:虚拟机栈和程序计数器。
- 3)执行start方法启动Java线程,操作系统为Java线程创建对应的内核线程,线程处于就绪状态。内核线程属于操作系统资源,创建也需要时间和内存
- 4)线程被操作系统CPU调度器选中后,线程开始运行任务。
- 5)线程在运行过程中,会被CPU不断切换运行。
- 6)线程运行完毕,Java线程被垃圾回收器回收
从线程的执行流程来看,可以得知:
- 1)线程不仅是Java对象,更是操作系统资源,创建线程和销毁线程,都需要时间。频繁的创建、销毁线程,会很大程度上影响处理效率。例如:创建线程花费时间T1,执行任务花费时间T2,销毁线程花费时间T3。如果T1+T3>T2,就很不划算
- 2)Java线程的创建和运行需要占用内存空间,线程数量一大,会消耗很多内存。线程不仅需要在堆中开辟空间,还需要为每个线程分配虚拟机栈和程序计数器。根据JVM规范,一个线程默认最大栈大小是1M,这个栈空间需要从内存中分配的。
- 3)CPU切换上下文时需要时间,线程数量多时,CPU会频繁切换线程上下文,会影响系统性能。
单CPU上同时只能运行一个线程,CPU通过切换上下文运行线程,实现多线程的并发。
所以说,线程并不是越多越好,线程数量和系统性能是一种抛物线的关系,当线程数量达到某个数值的时候,性能反而会降低很多,因此对线程的管理,尤其是数量的控制能直接决定程序的性能。对线程的重复利用是非常有必要的。
为了解决这些问题,所以产生了线程池。线程池的主要目的就是为了控制线程的数量,重复利用线程,提高执行效率。
线程池的优点
线程池的优点具体如下:
(1)重复利用线程
- 1)可以复用线程,降低了创建和销毁的性能开销。当线程数量一大,线程的创建和销毁的开销是巨大的。使用线程池,每当有新任务要执行时,可以复用已有的工作线程,大大减少了不必要的开销,这些开销包括内存开销和时间开销。
- 2)提升任务的响应速度,当有新任务需要执行时不需创建线程可以立即执行。当有新任务需要执行时不必创建新线程,可以使用线程池中的工作线程立即执行。这样就减少了创建线程的时间消耗,减少了任务执行时间,提升了任务的响应速度。
(2)控制线程的数量
-
1)可以根据系统承受能力,通过合理的控制线程数,防止线程数过多导致服务崩溃。
内存:当线程数量一大,线程的创建和运行消耗内存是巨大的,甚至有可能超过服务器的承受范围,导致的内存溢出问题。根据系统承受能力,合理的控制线程数,就可以防止这种情况发生。
CPU:CPU切换线程也是需要时间的,当线程数量过多时,CPU会频繁切换线程上下文,这个时间消耗也是不容忽视的。可以通过控制线程池的最大线程数,避免大量的线程池争夺CPU资源而造成的性能消耗。
-
2)线程池可以对线程进行统一的管理,支持更多的功能. 比如,线程池可以根据任务执行情况,动态的调整线程池中的工作线程的数量。当任务比较少时自动回收线程,当线程不够用时则新建。使用线程池可以进行统一分配、调优和监控。
线程池实现原理
所谓线程池,通俗的理解即一个容器,里面存放着提前创建好的若干个线程,当有任务提交给线程池执行时,任务会被分配给容器中的某个线程来执行。
任务执行完毕后,这个线程不会被销毁而是重新等待分配任务。
线程池有一个任务队列,缓存着异步提交待处理的任务。当任务过多超过了任务队列的容量时,线程池会自动扩充新的线程到池子中,但是最大线程数量是有上限的。同时当任务比较少的时候,池子中的线程还能够自动回收和释放资源。
由此可知,线程池应该具备如下要素:
- 1)线程池管理器:用于创建并控制线程数量,包括创建线程和销毁线程;
- 2)工作线程:线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 3)任务队列:用于缓存提交的任务。线程池提供了一种缓冲机制,用于处理用户提交的,还没有来得及处理的任务,一般是有数量限制的。
- 4)任务拒绝策略:如果任务队列已满且线程数已达到上限,则需要有相应的拒绝策略来处理后续任务
线程池ThreadPoolExecutor
类关系
Java线程池中标准的线程池是ThreadPoolExecutor
。该线程池的接口是Executor
和ExecutorService
。
-
Executor
是最上层接口,只有一个方法execute,用于执行提交任务Runnable实例。 -
ExecutorService
继承自Executor
,定义了线程池的主要接口,拓展了任务执行的Callable方式,以及生命周期相关的方法,如关闭线程池shutdown。ExecutorService的生命周期有三种状态:运行、关闭、终止。其中生命周期关闭和终止相关的主要方法如下
ThreadPoolExecutor
继承了AbstractExecutorService
,AbstractExecutorService
实现了任务执行的Callable
方式。
ThreadPoolExecutor
线程池有4个构造函数,下面基于它最完整的构造函数来讲解下每个参数的作用,构造函数代码如下所示
参数说明如下:
- 1)corePoolSize:核心线程数。
- 2)maximumPoolSize:最大线程数。
- 3)workQueue:任务队列,缓存已经提交但尚未被执行的任务。
- 4)keepAliveTime:空闲线程的存活时间。
- 5)unit:keepAliveTime的单位。
- 6)threadFactory:线程工厂(用于指定如何创建一个线程)。
- 7)handler:拒绝策略(工作队列已满且线程池中线程已达上限时的处理策略)。
线程池的工作流程
线程池刚被创建时,线程池中的线程数默认为0。当向线程池提交一个任务时,线程池的工作流程如下
- 1)如果当前线程数<corePoolSize,则创建新的线程并执行该任务。 当有新任务需要执行时,如果当前线程数<corePoolSize,即使这时候有空闲线程,也会创建新线程执行任务。
- 2)如果当前线程数>=corePoolSize且任务队列未满,则将任务存入任务队列中。等待线池中有空闲线程时,就会执行任务队列中的任务。
- 3)如果任务队列已满,且当前线程数<maximumPoolSize,则新建线程执行该任务。
- 4)如果阻塞队列已满,且当前线程数=maximumPoolSize,则执行拒绝策略,通知任务调用者线程池不再接受任务了。
线程池刚被创建时,是不会创建线程的,当有新任务需要执行时,线程池才会创建线程。线程在完成任务以后,线程是不会立即销毁的,线程会先查看任务队列,如果任务队列有等待执行的任务,线程会继续执行队列中的任务。如果这时没有任务需要执行,线程会主动挂起自己,当有新任务需要执行时,线程会被唤醒开始执行任务。
这样当有大量任务需要执行时,既节省了创建线程的性能损耗,也可以反复重用同一线程,节约大量性能开销.
任务队列
其中参数workQueue,是一个阻塞队列BlockingQueue,用于缓存待执行的任务。可以选择以下几种阻塞队列
- 1)
ArrayBlockingQueue
:一个数组结构的有界阻塞队列,使用时需指定数量,先进先出的 - 2)
LinkedBlockingQueue
:一个链表结构的阻塞队列,可指定最大容量,如果不指定容量默认为Integer.MAX_VALUE,该队列为先进先出的。线程池工厂Executors中的newFixedThreadPool线程池就使用了这种队列。 - 3)
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作会一直处于阻塞状态。若在线程池中使用此队列,若有新任务时会直接新建一个线程来执行新任务。线程池工厂Executors中的newCachedThreadPool线程池就使用了这种队列。 - 4)
PriorityBlockingQueue
:一个具有优先级的无限阻塞队列,真正的无界队列,按照元素权重出队。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。但是注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行
空闲线程的存活时间
空闲线程的存活时间的参数有两个:
- 参数keepAliveTime:空闲线程的存活时间。
- 参数unit:keepAliveTime的单位。这两个参数的单位一般使用秒或者毫秒就够了。在TimeUnit类中,可选的单位有
天(DAYS)、小时(HOURS)、分钟(MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)、微秒(MICROSECONDS ,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
。
这两个参数表示线程池的工作线程空闲后,其保持存活的时间。在默认情况下,只会回收非核心线程,核心线程是不回收的,但在设置了核心线程可回收后,核心线程空闲时间达到回收条件时也会被回收
allowCoreThreadTimeOut(true)
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(), // workQueueExecutors.defaultThreadFactory(), // threadFactorynew ThreadPoolExecutor.AbortPolicy() // handler
);// 允许核心线程超时并终止
executor.allowCoreThreadTimeOut(true);
如果任务很多,并且每个任务执行的时间比较短,可以调长时间,提高线程的利用率 .
参数ThreadFactory
用于创建线程池中线程的工厂,线程池默认是使用的是Executors中的DefaultThreadFactory,线程格式为pool-{线程池id}-thread-{线程id}。
可以通过自定义线程工厂类给创建出来的线程设置更有意义的名字,方便出错时回溯,对故障定位。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;public class CustomThreadFactory implements ThreadFactory {private final String poolName;private final AtomicInteger threadId = new AtomicInteger(1);public CustomThreadFactory(String poolName) {this.poolName = poolName;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, poolName + "-thread-" + threadId.getAndIncrement());t.setDaemon(false); // 设置为非守护线程t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级return t;}
}
如果觉得实现麻烦,还可以使用开源框架guava提供的ThreadFactoryBuilder,它可以给线程池里的线程设置有意义的名字
import com.google.common.util.concurrent.ThreadFactoryBuilder;public class GuavaThreadFactoryExample {public static void main(String[] args) {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").setDaemon(true) // 设置为守护线程.setPriority(Thread.MAX_PRIORITY) // 设置线程优先级.build();ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(), // workQueuethreadFactory, // threadFactorynew ThreadPoolExecutor.AbortPolicy() // handler);// 提交任务executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("任务执行中...");}});}
}
拒绝策略
当线程池中任务队列和线程都满时,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
在JDK中Java提供了以下4种拒绝策略。
AbortPolicy
:新任务直接被拒绝,并抛出异常RejectedExecutionException。DiscardPolicy
:新任务忽略不执行,丢弃。DiscardOldestPolicy
:抛弃任务队列中等待最久的任务,将新任务添加到等待队列中。CallerRunPolicy
:新任务使用调用者所在线程来执行任务。
当然,也可以根据应用场景需要,来实现RejectedExecutionHandler
接口自定义策略。比如记录日志或持久化存储不能处理的任务。可以使用两个方法向线程池提交任务,分别为execute和submit方法。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 记录日志System.err.println("Task " + r.toString() + " is rejected");// 持久化存储任务persistTask(r);}private void persistTask(Runnable r) {// 实现任务持久化逻辑System.out.println("Persisting task: " + r.toString());}
}
被拒绝后的任务如何再次执行
- 自定义拒绝策略
首先,实现 RejectedExecutionHandler 接口,定义自定义的拒绝策略。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 记录日志System.err.println("Task " + r.toString() + " is rejected");// 持久化存储任务persistTask(r);}private void persistTask(Runnable r) {// 实现任务持久化逻辑System.out.println("Persisting task: " + r.toString());// 假设任务被持久化到一个文件或数据库中// 这里只是一个简单的示例,实际应用中需要实现具体的持久化逻辑TaskStorage.storeTask(r);}
}
- 任务存储类
定义一个任务存储类 TaskStorage,用于模拟任务的持久化和读取。
import java.util.ArrayList;
import java.util.List;public class TaskStorage {private static List<Runnable> storedTasks = new ArrayList<>();public static void storeTask(Runnable task) {storedTasks.add(task);}public static List<Runnable> getStoredTasks() {return new ArrayList<>(storedTasks);}public static void clearTasks() {storedTasks.clear();}
}
- 重新提交任务
在适当的时候,从任务存储中读取任务并重新提交到线程池中。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExample {public static void main(String[] args) {// 创建线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(5, // corePoolSize10, // maximumPoolSize60L, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<Runnable>(10), // workQueueExecutors.defaultThreadFactory(), // threadFactorynew CustomRejectedExecutionHandler() // handler);// 提交一些任务for (int i = 0; i < 20; i++) {int taskId = i;executor.execute(() -> {System.out.println("Task " + taskId + " is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}// 等待一段时间,确保所有任务都处理完毕try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}// 从任务存储中读取任务并重新提交List<Runnable> storedTasks = TaskStorage.getStoredTasks();if (!storedTasks.isEmpty()) {System.out.println("Re-submitting stored tasks...");for (Runnable task : storedTasks) {executor.execute(task);}}// 关闭线程池executor.shutdown();}
}
向线程池提交任务
可以使用两个方法向线程池提交任务,分别为execute和submit方法。
execute
(1)execute
方法ThreadPoolExecutor中的方法execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute传入的任务是一个Runnable类的实例
submit
(2)submit方法submit相关的方法有三种,用于提交需要返回值的任务。任务提交后,线程池会立即返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过它的get方法来获取任务的计算结果。执行get
方法会阻塞当前线程直到任务完成,而使用get(longtimeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后返回,这时候有可能任务没有执行完。
submit 方法:
- 提交需要返回值的任务:submit 方法用于提交需要返回值的任务。
submit(Runnable task)
:提交一个 Runnable 任务,返回 Future<?> 对象。submit(Runnable task, T result)
:提交一个 Runnable 任务,并指定任务完成后的结果,返回Future<T>
对象。submit(Callable<T> task)
:提交一个 Callable 任务,返回Future<T>
对象。
Future 对象:
-
通过
get()
方法获取结果:调用 get() 方法获取任务的计算结果。 -
阻塞当前线程直到任务完成:调用 get() 方法会阻塞当前线程,直到任务完成。
-
get(long timeout, TimeUnit unit)
方法:调用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后返回,如果在这段时间内任务没有完成,可能会抛出 TimeoutException。 -
阻塞当前线程一段时间后返回:调用
get(long timeout, TimeUnit unit)
方法会阻塞当前线程一段时间后返回,如果任务在这段时间内没有完成,可能会抛出TimeoutException。
关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池
shutdown
- 线程池不再接收新的任务:调用 shutdown() 方法后,线程池不再接收新的任务。
- 所有任务(包括正在执行的和等待队列中的)完成后,线程池关闭:线程池会等待所有任务(包括正在执行的和等待队列中的)完成后,才会关闭。
shutdownNow
- 线程池不再接收新的任务:调用 shutdownNow() 方法后,线程池不再接收新的任务。
- 等待队列中的任务将不会被执行:等待队列中的任务将不会被执行。
- 尝试中断正在运行的线程:线程池会尝试中断正在运行的线程。
- 无法响应中断的任务无法终止:如果任务无法响应中断,它们将无法终止。
- 只能等待任务执行完毕后,才能关闭线程池:对于无法响应中断的任务,只能等待它们执行完毕后,线程池才能关闭。
isShutdown
返回 true:只要调用了 shutdown() 或 shutdownNow() 方法,isShutdown 方法就会返回 true。
isTerminated
返回 true:当所有的任务都已关闭后,isTerminated 方法会返回 true。
使用建议
- 通常调用 shutdown 方法来关闭线程池:如果任务重要且需要执行完,建议使用 shutdown 方法。
- 如果任务不重要不一定要执行完,则可以调用 shutdownNow 方法:如果任务不重要或可以中断,可以使用 shutdownNow 方法。
ThreadPoolExecutor源码解析
线程池的5种状态
工作线程workers
execute提交任务
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** 执行过程分为三个步骤:** 1. 如果当前运行的线程数少于核心线程数,尝试启动一个新的线程,* 并将给定的命令作为新线程的第一个任务。调用 addWorker 方法会原子地检查 runState 和 workerCount,* 从而防止在不应该添加线程时错误地添加线程,通过返回 false 来避免这种情况。** 2. 如果任务可以成功入队,我们仍然需要双重检查是否应该添加线程(因为自上次检查以来现有线程可能已经死亡)* 或者线程池是否在进入此方法后已关闭。因此,我们需要重新检查状态,如果线程池已停止,则回滚入队操作;* 如果没有线程在运行,则启动一个新线程。** 3. 如果任务无法入队,我们尝试添加一个新线程。如果失败,我们知道线程池已关闭或已饱和,* 因此拒绝该任务。*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 当前工作线程数少于核心线程数if (addWorker(command, true))return; // 成功添加工作线程,直接返回c = ctl.get(); // 重新获取控制状态}if (isRunning(c) && workQueue.offer(command)) {// 线程池正在运行且任务成功入队int recheck = ctl.get(); // 重新获取控制状态if (!isRunning(recheck) && remove(command)) {// 线程池已停止且任务已从队列中移除reject(command); // 拒绝任务} else if (workerCountOf(recheck) == 0) {// 重新检查工作线程数是否为0addWorker(null, false); // 添加一个空闲工作线程}} else if (!addWorker(command, false)) {// 任务无法入队且无法添加新线程reject(command); // 拒绝任务}
}
addWorker创建并执行工作线程
/*** 检查根据当前线程池状态和给定的边界(核心或最大),是否可以添加新的工作线程。如果可以,相应地调整工作线程的数量,* 并且如果可能,创建并启动一个新的工作线程,运行 `firstTask` 作为其第一个任务。如果线程池已停止或符合关闭条件,* 或者线程工厂无法创建线程时,此方法返回 false。如果线程创建失败(通常是由于线程工厂返回 null 或者在 `Thread.start()` 时抛出异常,如 `OutOfMemoryError`),* 我们会干净地回滚。** @param firstTask 新线程应首先运行的任务(如果没有则为 null)。当线程池中的线程少于核心线程数时(在这种情况下我们总是启动一个新线程),* 或者当队列已满时(在这种情况下我们必须绕过队列),工作线程会在 `execute()` 方法中创建并带有初始的第一个任务。* 初始空闲线程通常通过 `prestartCoreThread` 创建,或者用于替换其他即将死亡的工作线程。** @param core 如果为 true,则使用核心线程数作为边界,否则使用最大线程数。这里使用布尔指示器而不是值,以确保在检查其他线程池状态后读取最新的值。* @return 如果成功则返回 true*/
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 必要时检查队列是否为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // 重新读取 ctlif (runStateOf(c) != rs)continue retry;// 否则,CAS 失败是因为工作线程数量发生变化;重试内部循环}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 持有锁时重新检查// 如果线程工厂失败或在获取锁之前已关闭,则回退int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 预检查 t 是否可启动throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
Worker类
addWorker方法只是构造了一个Worker,并且把firstTask封装到Worker中。
每个Worker都是一条线程,包含了一个firstTask初始化时要被首先执行的任务。最终执行任务的是runWorker()方法。
属性
-
Worker类继承了AQS,并实现了Runnable接口。
-
firstTask字段用来保存传入的任务;表示 Worker 被创建时需要运行的第一个任务。如果为 null,则表示 Worker 是空闲的
-
thread字段,是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。表示实际的工作线程。每个 Worker 实例都对应一个 Thread 实例
-
volatile long completedTasks: 记录该 Worker 已完成的任务数量。这是一个 volatile 变量,确保多线程环境下的可见性和有序性。
构造方法
在调用构造方法时,需要传入任务,这里通过getThreadFactory(). newThread(this)来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
runWorker方法
在 run 方法中,调用 runWorker(this) 方法来执行任务
public void run() {runWorker(this);
}
/*** 主工作线程循环。重复从队列中获取任务并执行它们,同时处理多个问题:** 1. 我们可能从一个初始任务开始,在这种情况下,我们不需要获取第一个任务。否则,只要线程池在运行,我们就从 getTask 获取任务。* 如果 getTask 返回 null,则由于线程池状态或配置参数的变化,工作线程退出。其他退出情况是由于外部代码抛出异常,此时 completedAbruptly 为 true,* 通常会导致 processWorkerExit 替换这个线程。** 2. 在运行任何任务之前,获取锁以防止任务执行期间其他线程中断,然后确保除非线程池正在停止,否则该线程不会被中断。** 3. 每个任务运行之前都会调用 beforeExecute,可能会抛出异常,这种情况下我们会使线程终止(通过设置 completedAbruptly 为 true 退出循环)而不处理任务。** 4. 假设 beforeExecute 正常完成,我们运行任务,并收集其抛出的所有异常以传递给 afterExecute。* 我们分别处理 RuntimeException、Error(规范保证我们捕获这两种异常)和任意的 Throwable。* 由于我们不能在 Runnable.run 中重新抛出 Throwable,我们在退出时将其包装在 Error 中(传递给线程的 UncaughtExceptionHandler)。* 任何抛出的异常也会保守地导致线程终止。** 5. 在 task.run 完成后,我们调用 afterExecute,它也可能抛出异常,这同样会导致线程终止。* 根据 JLS 第 14.20 节,即使 task.run 抛出异常,这个异常也是有效的。** 异常机制的总体效果是,afterExecute 和线程的 UncaughtExceptionHandler 可以获得关于用户代码遇到的问题的最准确信息。** @param w 工作线程*/
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 如果线程池正在停止,确保线程被中断;// 否则,确保线程不被中断。这需要在第二种情况下重新检查,以处理清除中断时的 shutdownNow 竞争条件if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
初始化:
- 使用 retry 标签开始一个无限循环。
- 获取当前控制状态 c 和运行状态 rs。
检查线程池状态:
- 如果线程池状态大于等于 SHUTDOWN,并且不符合特定条件(即线程池处于 SHUTDOWN 状态且没有初始任务且队列不为空),则返回 false。
检查工作线程数量:
- 获取当前工作线程数量 wc。
- 如果工作线程数量达到容量上限或超过核心/最大线程数,则返回 false。
- 使用 compareAndIncrementWorkerCount 尝试增加工作线程数量,如果成功则跳出 retry 循环。
- 如果 compareAndIncrementWorkerCount 失败,重新读取控制状态 c 并检查运行状态 rs,如果运行状态发生变化则继续 retry 循环。
创建和启动工作线程:
- 尝试创建一个新的 Worker 对象 w。
- 获取 Worker 的线程 t。
- 如果线程 t 不为 null,则持有主锁 mainLock。
- 重新检查线程池状态,如果线程池未停止或处于 SHUTDOWN 状态且没有初始任务,则检查线程是否已启动。
- 将 Worker 添加到工作线程列表 workers,更新最大线程数 largestPoolSize,并设置 workerAdded 为 true。
- 释放主锁 mainLock。
- 如果 workerAdded 为 true,启动线程 t 并设置 workerStarted 为 true。
异常处理:
- 如果 workerStarted 为 false,调用 addWorkerFailed 方法进行回滚处理。
返回结果:
- 返回 workerStarted,表示是否成功启动了新的工作线程
getTask() 从任务队列中获取下一个任务
/*** 根据当前配置设置执行阻塞或定时等待任务,或者在以下情况下返回 null:* 1. 工作线程数超过 maximumPoolSize(由于调用了 setMaximumPoolSize)。* 2. 线程池已停止。* 3. 线程池已关闭且任务队列为空。* 4. 此工作线程在等待任务时超时,并且超时的工作线程将被终止(即,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* 在定时等待前后,如果队列不为空,则此工作线程不是池中的最后一个线程。** @return 任务,或者如果工作线程必须退出,则返回 null,在这种情况下 workerCount 将递减*/
private Runnable getTask() {boolean timedOut = false; // 上一次 poll() 是否超时?for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果必要,检查队列是否为空。if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 工作线程是否可以被裁剪?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
processWorkerExit
/*** 执行即将死亡的工作线程的清理和账簿记录。仅从工作线程调用。除非设置了 completedAbruptly,* 否则假设 workerCount 已经调整以反映退出。此方法从工作线程集合中移除线程,* 并可能终止线程池或替换工作线程,如果它因用户任务异常退出,或者运行中的工作线程少于* corePoolSize,或者队列不为空但没有工作线程。** @param w 即将退出的工作线程* @param completedAbruptly 如果工作线程因用户异常而死亡*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果突然退出,则 workerCount 没有调整decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 增加已完成任务计数workers.remove(w); // 从工作线程集合中移除该工作线程} finally {mainLock.unlock();}tryTerminate(); // 尝试终止线程池int c = ctl.get();if (runStateLessThan(c, STOP)) { // 如果线程池状态小于 STOPif (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 计算最小工作线程数if (min == 0 && !workQueue.isEmpty())min = 1; // 如果最小工作线程数为 0 且队列不为空,则最小工作线程数设为 1if (workerCountOf(c) >= min)return; // 如果当前工作线程数大于等于最小工作线程数,则不需要替换}addWorker(null, false); // 添加新的工作线程}
}
关闭线程池
shutdown
/*** 开始有序关闭,在此过程中之前提交的任务将继续执行,但不再接受新任务。* 如果已经关闭,则调用此方法不会产生额外效果。** <p>此方法不会等待之前提交的任务完成执行。使用 {@link #awaitTermination awaitTermination} 来实现这一点。** @throws SecurityException {@inheritDoc}*/
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查关闭权限advanceRunState(SHUTDOWN); // 将运行状态推进到 SHUTDOWNinterruptIdleWorkers(); // 中断空闲的工作线程onShutdown(); // 为 ScheduledThreadPoolExecutor 提供的钩子方法} finally {mainLock.unlock(); // 释放主锁}tryTerminate(); // 尝试终止线程池
}
shutdownNow
/*** 尝试停止所有正在执行的任务,停止处理等待中的任务,并返回一个包含等待执行任务的列表。* 这些任务在方法返回时将从任务队列中被移除。** <p>此方法不会等待正在执行的任务终止。使用 {@link #awaitTermination awaitTermination} 来实现这一点。** <p>除了尽力尝试停止正在执行的任务外,没有其他保证。此实现通过 {@link Thread#interrupt} 取消任务,* 因此任何不响应中断的任务可能永远不会终止。** @throws SecurityException {@inheritDoc}*/
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查关闭权限advanceRunState(STOP); // 将运行状态推进到 STOPinterruptWorkers(); // 中断所有工作线程tasks = drainQueue(); // 从任务队列中移除并返回等待的任务} finally {mainLock.unlock(); // 释放主锁}tryTerminate(); // 尝试终止线程池return tasks; // 返回等待的任务列表
}