高并发 - 2.线程池
线程池
- 1.Java线程创建非常昂贵,需要JVM和OS(操作系统)配合完成大量的工作
- 1.必须为线程堆栈分配和初始化大量内存块,其中包含至少1MB的栈内存
- 2.需要进行系统调用,以便在OS(操作系统)中创建和注册本地线程
- 2.Java高并发应用频繁创建和销魂线程的操作是非常低效的,而且是不被编程规范所允许的,必须使用线程池
- 1.提升性能:线程池能独立负责线程的创建,维护和分配。在执行大量异步任务时,可以不需要自己创建线程,而是将任务交给线程池去调度。线程池能尽可能使用空闲的线程去执行异步任务,最大限度地对已经创建地线程进行复用,使得性能提升明显
- 3.线程管理,每个Java线程池会保持一些基本地线程统计信息,例如完成的任务数量、空闲时间等,以便对线程进行有效管理,使得能对所接收到的异步任务进行高效调度。
- 4.在主要大厂的编程规范中,不允许在应用中自行显式地创建线程,线程必须通过线程池提供。由于创建和销毁线程需要时间以及系统资源开销,使用线程池的好处是减少这些开销,解决资源不足的问题。此外,在现实的互联网项目开发中,一般的做法是高并发接口单独线程池隔离处理,也就是,比如说有多个高并发接口,需要为每-个接口单独配置一个线程池(每个接口的业务特点不同,因此各自所配置的线程池参数也不尽相同)
1.JUC的线程池架构
- 1.多线程编程中,任务都是一些抽象且离散的工作单元,而
线程是使任务异步执行的基本机制
。随着应用的扩张,线程和任务管理也变得非常复杂,为了简化这些复杂的线程管理模式,需要一个管理者来统一管理线程以及任务分配,这就是线程池- 2.创建和管理复杂的线程,线程之所以复杂,是因为线程是任务异步执行的基本机制,而这些异步执行任务都非常的抽象且离散,所以对应的线程也是纷繁复杂的,所以需要线程池统一管理。
- 3.JUC中有关线程池的类与接口的架构图
- 4.JUC是java.util.concurrent工具包的简称,该工具包是从JDK1.5开始加入JDK,用于完成高并发,处理多线程的一个工具包
- 4.
Java
中和线程池相关的主要接口和类
- 1.
Executor
接口- 2.
ExecutorService
接口- 3.
Executors
类- 3.
ThreadPoolExecutor
类
1.Executor接口
package java.util.concurrent;public interface Executor {// 执行一个Runnable类型的target执行目标实例,无返回void execute(Runnable command); }
- 1.
Exector
接口位于JUC
包中,其是一个顶层接口并只声明了一个方法execute(Runnable)
- 2.
execute(Runnable)
方法
- 1.接收
Runnable
对象作为参数,并且以异步的方式执行,该命令可以在一个新线程中,一个池线程中或调用线程中执行,由Executor
实现决定- 3.Executor是Java异步目标任务的“执行者”接口,其目标是执行目标任务。“执行者”Executor提供了execute()接口来执行已提交的Runnable执行目标实例。Executor作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制
- 3.
ThreadPoolExecutor
实现类具体实现步骤
- 1.当工作线程数小于核心线程数时会创建核心线程
- 2.如果工作线程数大于等于核心线程数时会尝试将任务添加进队列
- 1.如果成功会对线程池的状态进行二次验证(因为可能存在刚好线程池的状态发生改变的情况)
- 2.二次验证时如果线程池不是处于
RUNNING
的状态则会将任务从队列中移除
- 1.如果移除失败:则会判断工作线程是否为
0
,如果过为0
就创建一个非核心线程(只要是RUNNING
的状态就一定要保证有工作线程还在)- 2.如果移除成功:则执行拒绝策略,因为线程池已经不可用了
- 3.二次验证时如果线程池处于
RUNNING
的状态会判断工作线程是否为0
,如果过为0
就创建一个非核心线程;- 3.如果失败说明队列已满,接着就去创建新的线程,如果大于最大线程数,则执行拒绝策略
2.ExecutorService接口
package java.util.concurrent; import java.util.List; import java.util.Collection;public interface ExecutorService extends Executor {// 启动有序关机,其中执行先前提交的任务,但不接受新任务。如果调用已经关闭,则没有额外的效果void shutdown();// 尝试停止所有正在执行的任务,停止对等待任务的处理,并返回等待执行的任务列表,此方法不等待正在执行的任务终止,而是使用awaitTermination来完成List<Runnable> shutdownNow();// 如果执行器已关闭,则返回trueboolean isShutdown();// 如果执行器已终止(即所有任务都完成),则返回trueboolean isTerminated();// 等待任务终止,如果超过指定时间则返回falseboolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;// 提交一个有返回值的任务以供执行,并返回表示该任务的挂起结果的Future,Future的get方法将在任务成功完成时返回任务的结果<T> Future<T> submit(Callable<T> task);// 提交一个Runnable任务和一个结果值,当任务执行完成后,返回该结果值,注意该结果值是在任务执行前就确定的,与任务实际结果无关<T> Future<T> submit(Runnable task, T result);// 提交一个Runnable任务,并返回一个Future对象,由于Runnable任务没有返回值,所以该Future对象的get方法将返回nullFuture<?> submit(Runnable task);// 批量提交Callable任务,并返回一个Future对象的列表,当所有任务都完成后,可以通过这些Future对象获取任务的返回值,如果某个任务执行失败,那么对应的Future对象的get方法将抛出ExecutionException异常。这个方法会等待所有任务都完成后才返回<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;// 上述方法的重载版本,可以设置等待超时时间<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;// 批量提交Callable任务,并返回第一个成功完成的任务的返回值,当找到第一个成功完成的任务后,该方法会立即返回而不会等待其他任务完成,如果所有任务都失败,那么该方法将抛出ExecutionException异常<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;// 上述方法的重载版本,可以设置等待超时时间<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }
- 1.
ExecutorService
接口位于JUC
包中,其继承Executor
接口,因此继承了父接口的execute()
抽象方法- 是Java异步目标任务的“执行者服务接”口,对外提供异步任务的接收服务。ExecutorService提供了“接收异步任务并转交给执行者”的方法,如submit系列方法、invoke系列方法等
- 2.
ExecutorService
是Java
提供的一个线程池接口,每次异步执行target
目标任务可通过ExecutorService
线程池实例负责对池中的线程进行管理和调度,并且可以有效控制最大并发线程数,提高系统资源的使用率,同时提供定时执行,单线程,并发数控制等功能
- 3.
ExecutorService
线程池的execute
与submit
方法的区别
- 1.
submit
可接收两种入参:无返回值的Runnable
类型的target
执行目标实例和有返回值的Callable
类型的target
执行目标实例;而execute
仅接收无返回值的target
执行目标实例或无返回值的Thread
实例- 2.
submit
有返回值,submit
方法在提交异步target
执行目标之后会返回Future
异步任务实例,以便对target
的异步执行过程进行控制;而execute
没有,target
执行目标实例在执行之后没有办法对其异步执行过程进行控制,只能任其执行直到其执行结束
1.shutdown()方法
/*** 关闭线程池(不再接收新任务,但已提交的任务会全部被执行)* 但不会等待任务彻底的执行完成(awaitTermination)*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;// shutdown操作中涉及大量的资源访问和更新,直接通过互斥锁防并发mainLock.lock();try {// 用于shutdown/shutdownNow时的安全访问权限checkShutdownAccess();// 将线程池状态从RUNNING推进到SHUTDOWNadvanceRunState(SHUTDOWN);// shutdown不会立即停止所有线程,而仅仅先中断idle状态的多余线程进行回收,还在执行任务的线程就慢慢等其执行完interruptIdleWorkers();// 单独为ScheduledThreadPoolExecutor开的一个钩子函数(hook for ScheduledThreadPoolExecutor)onShutdown();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();}/*** 用于shutdown/shutdownNow时的安全访问权限* 检查当前调用者是否有权限去通过interrupt方法去中断对应工作线程* */private void checkShutdownAccess() {// 判断jvm启动时是否设置了安全管理器SecurityManagerSecurityManager security = System.getSecurityManager();// 如果没有设置,直接返回无事发生if (security != null) {// 设置了权限管理器,验证当前调用者是否有modifyThread的权限// 如果没有,checkPermission会抛出SecurityException异常security.checkPermission(shutdownPerm);// 通过上述校验,检查工作线程是否能够被调用者访问final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (MyWorker w : workers) {// 检查每一个工作线程中的thread对象是否有权限被调用者访问security.checkAccess(w.thread);}} finally {mainLock.unlock();}}}/*** 中断所有处于idle状态的线程* */private void interruptIdleWorkers() {// 默认打断所有idle状态的工作线程interruptIdleWorkers(false);}private static final boolean ONLY_ONE = true;/*** 中断处于idle状态的线程* @param onlyOne 如果为ture,至多只中断一个工作线程(可能一个都不中断)* 如果为false,中断workers内注册的所有工作线程* */private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (MyWorker w : workers) {Thread t = w.thread;// 1. t.isInterrupted(),说明当前线程存在中断信号,之前已经被中断了,无需再次中断// 2. w.tryLock(), runWorker方法中如果工作线程获取到任务开始工作,会先进行Lock加锁// 则这里的tryLock会加锁失败,返回false。 而返回true的话,就说明当前工作线程是一个idle(空闲)线程,需要被中断if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {// tryLock成功时,会将内部state的值设置为1,通过unlock恢复到未加锁的状态w.unlock();}}if (onlyOne) {// 参数onlyOne为true,至多只中断一个工作线程// 即使上面的t.interrupt()没有执行,也在这里跳出循环break;}}} finally {mainLock.unlock();}}/*** 单独为jdk的ScheduledThreadPoolExecutor开的一个钩子函数* 由ScheduledThreadPoolExecutor继承ThreadExecutor时重写(包级别访问权限)* */void onShutdown() {}
- 1.
shutdown
方法在入口处使用mainLock
加锁后,通过checkShutdownAccess
方法检查当前是否有权限访问工作线程(前提是设置了SecurityManager
),如果无权限则会抛出SecurityException
异常- 2.通过
advanceRunState
方法将线程池状态
推进到SHUTDOWN
:即停止接收新submit
任务,已经提交的任务(包括正在执行和队列中等待)会继续执行完成,直到所有的全部执行完成才真正停止- 3.注意
tryLock
方法通过尝试加锁判断是否为工作线程,如果是工作线程则不进行处理,否则如果是空闲线程则尝试通过Thread.interrupt
方法打断- 4.该方法非阻塞,不会等待任务彻底执行完成从而阻塞当前线程
2.shutdowNow()
/*** 立即关闭线程池(不再接收新任务,工作队列中未完成的任务会以列表的形式返回)* @return 当前工作队列中未完成的任务* */public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;// shutdown操作中涉及大量的资源访问和更新,直接通过互斥锁防并发mainLock.lock();try {// 用于shutdown/shutdownNow时的安全访问权限checkShutdownAccess();// 将线程池状态从RUNNING推进到STOPadvanceRunState(STOP);interruptWorkers();// 将工作队列中未完成的任务提取出来(会清空线程池的workQueue)tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();return tasks;}/*** shutdownNow方法内,立即终止线程池时该方法被调用* 中断通知所有已经启动的工作线程(比如等待在工作队列上的idle工作线程,或者run方法内部await、sleep等,令其抛出中断异常快速结束)* */private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (MyWorker w : workers) {// 遍历所有的worker线程,已启动的工作线程全部调用Thread.interrupt方法,发出中断信号w.interruptIfStarted();}} finally {mainLock.unlock();}}/*** 将工作队列中的任务全部转移出来* 用于shutdownNow紧急关闭线程池时将未完成的任务返回给调用者,避免任务丢失* */private List<Runnable> drainQueue() {BlockingQueue<Runnable> queue = this.workQueue;ArrayList<Runnable> taskList = new ArrayList<>();queue.drainTo(taskList);// 通常情况下,普通的阻塞队列的drainTo方法可以一次性的把所有元素都转移到taskList中// 但jdk的DelayedQueue或者一些自定义的阻塞队列,drainTo方法无法转移所有的元素// (比如DelayedQueue的drainTo方法只能转移已经不需要延迟的元素,即getDelay()<=0)if (!queue.isEmpty()) {// 所以在这里打一个补丁逻辑:如果drainTo方法执行后工作队列依然不为空,则通过更基础的remove方法把队列中剩余元素一个一个的循环放到taskList中for (Runnable r : queue.toArray(new Runnable[0])) {if (queue.remove(r)) {taskList.add(r);}}}return taskList;}
- 1.
shutdownNow
方法在入口处使用mainLock
加锁后,通过checkShutdownAccess
方法检查当前是否有权限访问工作线程(前提是设置了SecurityManager
),如果无权限则会抛出SecurityException
异常- 2.通过
advanceRunState
方法将线程池状态推进到STOP
:即停止接收新submit
任务,尝试将正在执行的(包括正在执行和队列中等待)任务通过interrupt
方法中断,并通过drainQueue
方法返回未执行的任务列表- 3.注意
shutdownNow
方法试图终止线程的方法是通过Thread.interrupt()
方法来实现,该方法作用有限,如果线程中没有sleep,wait,join,condition,定时锁
等应用,interrupt()
方法是无法中断当前的线程的,所以sutdownNow
方法并不代表线程池一定能立即退出,也可能必须要等待所有正在执行的任务都执行完了才能退出,但是大多数是能立即退出- 4.该方法非阻塞,不会等待任务彻底执行完成从而阻塞当前线程
3.awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {// 将超时时间转化为纳秒单位long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {// 如果线程池已经是TERMINATED状态,返回trueif (runStateAtLeast(ctl.get(), TERMINATED))return true;// 超时但是线程池未关闭,返回falseif (nanos <= 0)return false;// 实现阻塞效果nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}
- 1.
awaitTermination
方法不会更改线程池状态,不会尝试终止线程池,也不会阻止接收新任务- 2.
awaitTermination
方法执行时会阻塞调用线程,直到所有已提交的任务(包括正在跑的和队列中等待的)执行完,或等待超时时间到了(timeout
和TimeUnit
设定的时间),然后监测ExecutorService
是否已经关闭,如果所有任务执行完毕则返回true
,否则返回false
2.submit(Runnable task)方法
- 1.接收一个
Runnable
的实现作为参数并返回一个Future
对象- 2.该
Future
对象可用于判断Runnable
是否结束执行,因为其get()
方法是阻塞执行- 3.因为
sleep()
方法需要抛出受检异常,所以通过@SneakyThrows
进行捕获,其中的@SneakyThrows
注解可参考https://blog.csdn.net/qq_22162093/article/details/115486647
3.submit(Callable< T > task)
- 1.类似
submit(Runnable task)
方法,区别在于接收不同的参数类型- 2.
Callable
的call()
方法可以返回结果且可以抛出异常,Runnable
的run()
方法不能返回结果也不能抛出异常- 3.
Callable
的返回值可以从返回的Future
对象中获取
4.inVokeAny()
- 1.接收一个包含
Callable
对象的集合作为参数- 2.调用该方法不会返回
Future
对象,而是返回集合中某一个Callable对象的结果- 3.无法保证调用之后返回的结果属于哪个
Callable
,只知道属于该Callable 集合中的一个执行结束的Callable对象- 4.如果一个任务运行完毕或抛出异常则会取消其它的
Callable
的执行
5.invokeAll()
- 1.接收一个包含
Callable
对象的集合作为参数- 2.调用参数集合中的所有
Callable
对象,并且返回一个包含Future
对象的集合,通过该集合来管理每个Callable
的执行结果- 3.任务有可能因为异常而导致运行结束,所以可能并不是真的成功运行,但是没有办法通过
Future
对象来感知到这个差异,可以通过捕获异常获取执行失败的任务,且一个任务异常不会影响其他任务的正常执行
6.
ExecuteService
的关闭
- 1.使用
ExecutorService
完毕之后应该关闭它,这样才能保证线程不会继续保持运行状态- 2.如果程序通过
main()
方法启动并且主线程退出了程序,如果还有活动的ExecutorService
存在程序中则程序将会继续保持运行状态,存在于ExecutorService
中的活动线程会阻止Java
虚拟机关闭- 3.为了关闭在
ExecutorService
中的线程可以调用shutdown()
方法,ExecutorService
并不会马上关闭所,而是不再接收新的任务,一旦所有的线程结束执行当前任务,ExecutorServie
才会真的关闭,所有在调用shutdown()
方法之前提交到ExecutorService
的任务都会执行- 4.如果希望立即关闭
ExecutorService
可以调用shutdownNow()
方法,该方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务,但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能真的被关闭掉了也有可能会一直执行到任务结束
3.AbstractExecutorService
- 1.AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,AbstractExecutorService存在的目的是为ExecutorService中的接口提供默认实现
4.ThreadPoolExecutor
- 1.ThreadPoolExecutor就是大名鼎鼎的线程池实现类,它继承于AbstractExecutorService抽象类
- 2.ThreadPoolExecutor是JUC线程池的核心实现类,线程的创建和终止需要很大的开销,线程池中预先提供了指定数量的可重用线程,所以使用线程池会节省系统资源,并且每个线程池都维护了一些基础的数据统计,方便线程的管理和监控
5.ScheduledExecutorService
- 1.ScheduledExecutorService是一个接口,它继承于ExecutorService
- 2.它是一个可用完成
延时
和周期性
任务的调度线程池接口,其功能和Timer/TimerTask类似
6.ScheduledThreadPoolExecutor
- 1.ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,它提供了ScheduledExecutorService线程池接口中的延时执行和周期执行等抽象调度方法的具体实现
- 2.ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer
2.Executors类
public class Executors {public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}public static ExecutorService newWorkStealingPool(int parallelism) {return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}public static ExecutorService newWorkStealingPool() {return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();}public static ThreadFactory privilegedThreadFactory() {return new PrivilegedThreadFactory();}public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}public static Callable<Object> callable(Runnable task) {if (task == null)throw new NullPointerException();return new RunnableAdapter<Object>(task, null);}/** Cannot instantiate. */private Executors() {} }
- 1.
Java
中提供了Executors
类,其采用工厂设计模式,相当于一个线程池工厂(注意
:是线程池工厂而不是线程工厂),提供了多种不同的线程池可供直接使用,即创建线程池的方法都是静态方法可以直接使用类名调用就能获取
,实际生产环境禁止直接使用Executors
创建线程池- Executors是一个静态工厂类,它通过静态工厂方法返回ExecutorService、ScheduledExecutorService等线程池示例对象,这些静态工厂方法可以理解为一些快捷的创建线程池的方法。
- 在实际生产环境中,通常禁止使用 Executors创建线程池的原因是因为 Executors 提供的线程池创建方式有一些缺点,可能会导致一些问题。没有明确的线程池大小限制:Executors默认的线程池大小是无限制的,这可能会导致线程数过多,造成系统负载过高,影响系统性能。因此,在实际生产环境中,应该明确线程池的大小限制,根据系统的负载情况合理地分配线程池大小。使用无界队列:Executors 默认使用无界队列,这可能会导致队列中任务积压过多,进而导致内存泄漏等问题。因此,在实际生产环境中,应该使用有界队列,以控制任务的数量和限制资源的使用。 没有明确的线程池类型:Executors 创建线程池时,没有明确指定线程池类型,这可能会导致线程池的执行方式与系统的实际需求不符。因此,在实际生产环境中,应该根据系统的实际需求,选择合适的线程池类型,例如ThreadPoolExecutor。综上所述,虽然Executors创建线程池非常方便,但是在实际生产环境中,为了确保系统的性能和稳定性,应该根据系统的实际需求,明确线程池的大小限制、使用有界队列以及选择合适的线程池类型。
- 2.
newSingleThreadExecutor
- 1.创建一个单线程的线程池,返回类型为
ExecutorService
- 2.其本质是调用
ThreadPoolExecutor
类的构造方法并固定线程数为1
- 3.使用此方法创建的线程池能保证所有任务按照指定顺序(
FIFO
)执行- 和newFixedThreadPool(1)方法的区别返回线程池被FinalizableDelegatedExecutorService,执行线程不可被修改
- 单线程的线程池中的任务是按照提交的次序顺序执行的
- 池中的唯一线程的存活时间是无限的
- 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列,并且其阻塞队列是无界的
- 无界队列,就是没有固定大小的队列,不过它并不是像我们理解的那种元素没有任何限制,而是它的元素存储量很大,像LinkedBlockingQueue,它的默认队列长度是Integer.Max Value,所以我们感知不到它的长度限制,
- 总体来说,单线程化的线程池所适用的场景是:任务按照提交次序,一个任务一个任务地逐个执行的场景。以上用例在最后调用shutdown()方法来关闭线程池。执行shutdown()方法后,线程池状态变为SHUTDOWN,此时线程池将拒绝新任务,不能再往线程池中添加新任务,否则会抛出RejectedExecutionException异常。此时,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成才会退出。还有一个与shutdown()类似的方法,叫作shutdownNow(),执行shutdownNow()方法后,线程池状态会立刻变成STOP,并试图停止所有正在执行的线程,并且不再处理还在阻塞队列中等待的任务,会返回那些未执行的任务。
- 3.
newFixedThreadPool
- 1.创建一个指定数量的线程池,返回类型为
ExecutorService
- 2.其本质是调用
ThreadPoolExecutor
类的构造方法并指定线程数- 3.当线程池中有可用线程,提交的任务就会立即执行,当前线程中没有可用线程,则会将任务放入到一个队列中直到有线程可用
- 该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”
- 试用例中,创建了一个线程数为3的“固定数量线程池”,然后向其中提交了10个任务。从输出结果可以看到,该线程池同时只能执行3个任务,剩余的任务会排队等待
- “固定数量的线程池”的特点大致如下:(1)如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量。(2)线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。(3)在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。固定数量的线程池”的适用场景:需要任务长期执行的场景。“固定数量的线程池”的线程数能够比较稳定地保证一个数,能够避免频繁回收线程和创建线程,故适用于处理CPU密集型的任务,在CPU被工作线程长时间占用的情况下,能确保尽可能少地分配线程。“固定数量的线程池”的弊端:内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽。
- 4.
newCachedThreadPool
- 1.创建一个可缓存的线程池,返回类型为
ExecutorService
- 2.其本质是调用
ThreadPoolExecutor
类的构造方法- 3.该线程池不对线程的数量做限制,只要有线程任务没有线程来处理,就会创建一个线程,同时该线程池有一个回收的功能,如果某个线程超过
60
秒还没有任务,就会被自动回收掉- 该方法用于创建一个“可缓存线程池”,如果线程池内的某些线程无事可干成为空闲线程,“可缓存线程池”可灵活回收这些空闲线程。
- 1)在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务。
- 2.此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
- 3.如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程。
- 4.可缓存线程池”的适用场景:需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景。“可缓存线程池”的线程数量不固定,只要有空闲线程就会被回收;接收到的新异步任务执行目标,查看是否有线程处于空闲状态,如果没有就直接创建新的线程。
- 5.可缓存线程池”的弊端:线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因创建线程过多而导致资源耗尽。
- 5.
newScheduledThreadPool
- 1.创建一个可用于执行周期性任务的线程池,返回类型为
ScheduledExecutorService
- 2.其本质是调用
ScheduledThreadPoolExecutor
类的构造方法- 3.该方法用于创建一个“可调度线程池”,即一个提供“延时”和“周期性”任务调度功能的ScheduledExecutorService类型的线程池。Executors提供了多个创建“可调度线程池”的工厂方法
//方法一:创建一个可调度线程池,池内仅含有一个线程public static ScheduledExecutorService newSingleThreadScheduledExecutor();//方法二:创建一个可调度线程池,池内含有N个线程,N的值为输入参数corePoolSizepublic static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) ;
newSingleThreadScheduledExecutor工厂方法所创建的仅含有一个线程的可调度线程池适用于调度串行化任务,也就是一个任务一个任务地串行化调度执行。调用Executors.newScheduledThreadPool(int corePoolSize)快捷工厂方法创建一个“可调度线程池”的测试用例
newScheduledThreadPool工厂方法可以创建一个执行“延时”和“周期性”任务的可调度线程池,所创建的线程池为ScheduleExecutorService类型的实例。ScheduleExecutorService接口中有多个重要的接收被调目标任务的方法,其中scheduleAtFixedRate和scheduleWithFixedDelay使用得比较多。ScheduleExecutorService中接收被调目标任务的方法之一scheduleAtFixedRate的定义如下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, //异步任务target执行目标实例long initialDelay, //首次执行延时long period, //两次开始执行最小间隔时间TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量);
ScheduleExecutorService中接收被调目标任务的方法之二
scheduleWithFixedDelay的定义如下
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, //异步任务target执行目标实例long initialDelay, //首次执行延时long delay, //前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)TimeUnit unit //所设置的时间的计时单位,如TimeUnit.SECONDS常量);
当被调任务的执行时间大于指定的间隔时间时,ScheduleExecutorService并不会创建一个新的线程去并发执行这个任务,而是等待前一次调度执行完毕
如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会同时执行
这句话很容易产生歧义,总之意思就是,可调度的线程,执行该方法时,执行B必须等待执行A结束后并满足延迟时长后才会继续执行。
“可调度线程池”的适用场景:周期性地执行任务的场景。Spring Boot中的任务调度器,底层借助了JUC的ScheduleExecutorService“可调度线程池”实现,并且可以通过@Configuration配置类型的Bean。对“可调度线程池”实例进行配置,下面是一个例子:
以上为Executors中4个主要的快捷创建线程池的方法。为何JUC要提供工厂方法呢?原因是使用ThreadPoolExecutor、ScheduledThreadPoolExecutor构造器创建普通线程池、可调度线程池比较复杂,这些构造器会涉及大量的复杂参数。尽管Executors的工厂方法使用方便,但是在生产场景中被很多企业(尤其是大厂)的开发规范所禁用。
因为大厂业务请求体量不一样,虽然Executors方便但在这些特殊场景下不严谨对待容易出现无界队列资源耗尽的风险
面对特定业务场景,Executors】厂类中提供的创建线程池方法并不实用,因为阻塞队列或者线程数没有进行控制,可能会导致服务器资源被迅速占用导致不可控的生产事故
Executors创建的线程ch8i中,有些任务队列LinkedBlockingQueue的大小为Integer最大值,没有限制,比如newFixedThreadPool,有些对线程最大值没有限制,比如newScheduledThreadPool并且任务队列满了之后,会自动扩大为之前的50%这些都有将资源占用殆尽的风险
原因:底层实现使用了无界队列,在不了解原理的情况下使用,可能会造成非常严重的后果,
atfixedrate 执行时间大于间隔时间,结束后会立即执行下一次 fixeddelay 执行时间大于间隔时间结束后等待间隔时间后在执行下一次
- 6.
newWorkStealingPool
- 1.创建了一个抢占式的线程池,返回类型为
ExecutorService
- 2.其本质是调用
ForkJoinPool
类的构造方法- 3.
ForkJoinPool
是JDK7
引入的一种新线程池,同ThreadPoolExecutor
一样也继承AbstractExecutorService
抽象类- 4.
ForkJoinPool
使用一个无限队列来保存需要执行的任务,线程的数量可通过构造函数传入,如果没有传入则当前计算机可用的CPU
数量会被设置为线程数量作为默认值- 5.
ForkJoinPool
能够实现工作窃取(Work Stealing
),该线程池的每个线程中会维护一个队列来存放需要被执行的任务,当线程自身队列中的任务都执行完毕后,会从别的线程中拿到未被执行的任务并帮助执行- 6.
newWorkStealingPool
会创建一个含有足够多线程的线程池来维持相应的并行级别,通过工作窃取的方式使得多核的CPU
不会闲置,总会有活着的线程让CPU
去运行
3.线程池的标准创建方式
- 1.大部分企业的开发规范都会禁止使用快捷线程池(具体原因稍后介绍),要求通过标准构造器ThreadPoolExecutor去构造工作线程池。Executors工厂类中创建线程池的快捷工厂方法实际上是调用ThreadPoolExecutor(定时任务使用ScheduledThreadPoolExecutor)线程池的构造方法完成的。ThreadPoolExecutor构造方法有多个重载版本,其中一个比较重要的构造器如下:
// 使用标准构造器构造一个普通的线程池public ThreadPoolExecutor(int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收int maximumPoolSize, // 线程数的上限long keepAliveTime, TimeUnit unit, // 线程最大空闲(Idle)时长 BlockingQueue<Runnable> workQueue, // 任务的排队队列ThreadFactory threadFactory, // 新线程的产生方式RejectedExecutionHandler handler) // 拒绝策略
- 2.定时调度线程池的创建,没有可以初始化任务队列的方法,默认的任务队列初始为16,超过之后将会扩大一倍
- 2.参数进行具体介绍。
1…核心和最大线程数量
- 1.参数corePoolSize用于设置核心(Core)线程池数量,参数maximumPoolSize用于设置最大线程数量。线程池执行器将会根据corePoolSize和maximumPoolSize自动维护线程池中的工作线程,大致规则为:
- 1.当在线程池接收到新任务,并且当前工作线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求,直到线程数达到corePoolSize
- 2.如果当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务排队队列已满时才会创建新线程。通过设置corePoolSize和maximumPoolSize相同,可以创建一个固定大小的线程池
- 3.当maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务
- 4.corePoolSize和maximumPoolSize不仅能在线程池构造时设置,也可以调用setCorePoolSize()和setMaximumPoolSize()两个方法进行动态更改。
2.BlockingQueue
- 1.BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。
- 2.分为有界队列和无界队列。同时有分同步队列和异步队列,同步队列是不需要队列空间的,属于无界队列
- 3.当阻赛队列满时,才会创建大于核心线程数的线程
3.keepAliveTime
- 1.线程构造器的keepAliveTime(空闲线程存活时间)参数用于设置池内线程最大Idle(空闲)时长(或者说保活时长),如果超过这个时间,默认情况下Idle、非Core线程会被回收(强调的是“空闲的非核心线程”超时会被回收,核心线程空闲超时后默认不会被回收,除非调用allowCoreThreadTimeOut(boolean)方法,并传入了参数true)
- 2.如果池在使用过程中提交任务的频率变高,也可以调用方法setKeepAliveTime(long,TimeUnit)进行线程存活时间的动态调整,可以将时长延长。如果需要防止Idle线程被终止,可以将Idle时间设置为无限大,具体如下:
setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
- 3.默认情况下,Idle超时策略仅适用于存在超过corePoolSize线程的情况。但若调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程。
4.向线程池提交任务的两种方式
- 向线程池提交任务的两种方式大致如下:方式一:调用execute()方法,例如:
//Executor 接口中的方法void execute(Runnable command);
- 方式二:调用submit()方法,例如:
//ExecutorService 接口中的方法<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);
- 以上的submit()和execute()两类方法的区别在哪里呢?大致有以下三点:
- 1.二者所接收的参数不一样Execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数。Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果。Callable是JDK 1.5加入的执行目标接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。Runnable和Callable的主要区别为:Callable允许有返回值,Runnable不允许有返回值;Runnable不允许抛出异常,Callable允许抛出异常
- 2.submit()提交任务后会有返回值,而execute()没有execute()方法主要用于启动任务的执行,而任务的执行结果和可能的异常调用者并不关心。submit()方法也用于启动任务的执行,但是启动之后会返回Future对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果
- 3.submit()方便Exception处理execute()方法在启动任务执行后,任务执行过程中可能发生的异常调用者并不关心。而通过submit()方法返回的Future对象(异步执行实例),可以进行异步执行过程中的异常捕获。
- 因为FutureTask里面实现了Runnable的run方法,在run方法里面调用submit提交的Callable接口的实现类里面实现的带有返回值的call方法,并对这个方法进行了异常捕获,并进行记录,放到了outcome对象里面,并且记录了线程状态信息,这样就可以捕获异常处理了
- 只是方法声明不能声明受检异常,但方法内部是可以throw异常的。见本下节最后一个例子
1.通过submit()返回的Future对象获取结果
- 1.submit()方法自身并不会传递结果,而是返回一个Future异步执行实例,处理过程的结果被包装到Future实例中,调用者可以通过Future.get()方法获取异步执行的结果。通过submit()返回的Future对象获取异步执行结果,演示代码如下:
- 注意:future.get0会阻塞当前线程,在调用future.get0)的地方会等待future返回的结果,这时又变成了同步操作。在Java8中引入了CompletableFuture,使用它提供的API可以不用像之前那样阻塞式或轮询的获取某个异步任务的结果,CompletableFuture 会在异步任务处理完成后自动进行回调,让你可以链式的组合多个异步任务,CompletableFuture 类中提供了许多以 Async 后缀结尾的方法。通常而言,名称中不带 Async的方法和它的前一个任务一样,在同一个线程中运行。而名称以 Async 结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。
- 2.通过submit()返回的Future对象捕获异常submit()方法自身并不会传递异常,处理过程中的异常都被包装到Future实例中,调用者在调用Future.get()方法获取执行结果时,可以捕获异步执行过程中抛出的受检异常和运行时异常,并进行对应的业务处理。演示代码如下:
2.通过submit()返回的Future对象捕获异常
- submit()方法自身并不会传递异常,处理过程中的异常都被包装到Future实例中,调用者在调用Future.get()方法获取执行结果时,可以捕获异步执行过程中抛出的受检异常和运行时异常,并进行对应的业务处理。演示代码如下:
结果被保存在outcome里面,配合线程的状态信息来进行结果的导向,future.get,如果是异常状态,则说明有异常,通过吧outcome的异常强制转换并抛出。正常则直接返回结果。还有就是如果状态为没有执行完,就让调用者挂起。然后等待执行完被唤醒,拿到结果- 在ThreadPoolExecutor类的实现中,内部核心的任务提交方法是execute()方法,虽然用户程序通过submit()也可以提交任务,但是实际上submit()方法中最终调用的还是execute()方法
- 「那么为什么submit没有异常信息呢?因为submit是将任务封装成了一个futureTask,然后这个futureTask被封装成worker,在woker的run方法里面,最终调用的是futureTask的run方法,里面是直接吞掉了异常,并没有抛出异常,因此在worker的runWorker方法里面无法捕获到异常。」
3.线程池的任务调度流程
- 1.线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下
- 1.如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
- 2.如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程(所以如果阻塞队列无界,那么最大线程数将毫无意)
- 3.当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光
- 4.在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
- 5.在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略。
- 6.在创建线程池时,如果线程池的参数(如核心线程数量、最大线程数量、BlockingQueue等)配置得不合理,就会出现任务不能被正常调度的问题。
- 错误示例
package com.crazymakercircle.multithread.basic.create3;// 省略importpublic class CreateThreadPoolDemo{@org.junit.Testpublic void testThreadPoolExecutor(){ThreadPoolExecutor executor = new ThreadPoolExecutor(1, //corePoolSize100, //maximumPoolSize100, //keepAliveTime 空闲保活时长TimeUnit.SECONDS, //空闲保活时长的单位new LinkedBlockingDeque<>(100));//workQueue//提交5个任务for (int i = 0; i < 5; i++){final int taskIndex = i;executor.execute(() ->{Print.tco("taskIndex = " + taskIndex);try{ //极端测试:无限制睡眠Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e){e.printStackTrace();}});}while (true){ //每隔1秒,输出线程池的工作任务数量、总计的任务数量Print.tco("- activeCount:" + executor.getActiveCount()+" - taskCount:" + executor.getTaskCount());sleepSeconds(1);}}// 省略其他}
- 以上示例创建了最大线程数量maximumPoolSize为100的线程池,仅仅向其中提交了5个任务。理论上,这5个任务都会被执行到,奇怪的是示例中只有1个任务在执行,其他的4个任务都在等待。其他任务被加入到了阻塞队列中,需要等pool-1-thread-1线程执行完第一个任务后,才能依次从阻塞队列取出执行。但是,实例中的第一个任务是一个永远也没有办法完成的任务,所以其他的4个任务只能永远在阻塞队列中等待着。由于参数配置得不合理,因此出现了以上的奇怪现象。为什么会出现上面的奇怪现象呢?因为例子中的corePoolSize为1,阻塞队列的大小为100,按照线程创建的规则,需要等阻塞队列已满,才会去创建新的线程。例子中加入了5个任务,阻塞队列大小为4(<100),所以线程池的调度器不会去创建新的线程,后面的4个任务只能等待。
- 以上示例的目的是传递两个知识点:(1)核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象。(2)线程池的调度器创建线程的一条重要的规则是:在corePoolSize已满之后,还需要等阻塞队列已满,才会去创建新的线程。
4.ThreadPoolExecutor类
public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 线程池线程数的bit数,其中Integer.SIZE = 32,所以COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// 线程池中最大线程容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 接受新任务并处理排队任务private static final int RUNNING = -1 << COUNT_BITS;// 不接受新任务,而是处理已排队的任务private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接受新任务,不处理排队的任务,不中断正在进行的任务private static final int STOP = 1 << COUNT_BITS;// 所有任务都已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法private static final int TIDYING = 2 << COUNT_BITS;// 已完成private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl// 获取线程池地运行状态private static int runStateOf(int c) { return c & ~CAPACITY; }// 获取有效工作线程地数量private static int workerCountOf(int c) { return c & CAPACITY; }// 组装线程数量和线程池状态private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1.工作线程数量 < corePoolSize => 直接创建线程执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 工作线程数量 >= corePoolSize && 线程池处于运行状态 => 将任务添加至阻塞队列中workQueue.offer(command)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();/*** 为什么需要double check线程池地状态?* 1.往阻塞队列中添加任务地时候,有可能阻塞队列已满,需要等待其他的任务移出队列,在这个过程中,线程池的状态可能会发生变化,所以需要doublecheck* 2.如果在往阻塞队列中添加任务地时候,线程池地状态发生变化,则需要将任务remove*/if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 线程池状态处于非RUNNING状态,添加worker失败if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 判断线程池中线程数量是否处于该线程池允许的最大线程数量,如果允许创建线程,则cas更新线程池中线程数量,并退出循环检查,执行下面创建线程地逻辑for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// 如果线程池处于RUNNING状态,并且线程已经启动则提前抛出线程异常启动异常if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将线程加入已创建地线程集合,更新用于追踪线程池中线程数量largestPoolSize字段workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 启动线程执行任务if (workerAdded) {// 启动线程会调用Worker中地runWorker()来执行任务t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}final void reject(Runnable command) {handler.rejectedExecution(command, this);}final void runWorker(Worker w) {// 获取执行任务线程Thread wt = Thread.currentThread();// 获取执行任务Runnable task = w.firstTask;// 将的任务置空w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 双重检查线程池是否正在停止,如果线程池停止,并且当前线程能够中断,则中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 前置执行任务钩子函数beforeExecute(wt, task);Throwable thrown = null;try {// 执行当前任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 后置执行任务钩子函数afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 回收线程processWorkerExit(w, completedAbruptly);}}
- 1.
ThreadPoolExecutor
类是Java
提供用于创建线程池的类,ThreadPoolExecutor
继承AbstractExecutorService
抽象类,且其中封装了一个内部类Worker
作为工作线程,每个Worker
中都维持着一个Thread
- 2.
《阿里巴巴-Java开发手册》
中规定不要使用Executors
创建线程池而是通过ThreadPoolExecutor
创建,该方式可以明确线程池的运行规则,规避资源耗尽的风险- 3.
ThreadPoolExecutor
中使用AtomicInteger
类型的ctl
属性描述线程池的运行状态和线程数量,通过ctl
的高三位来表示线程池的5
种状态,低29
位表示线程池中现有的线程数量,使用最少的变量来减少锁竞争,提高并发效率
- 4.
ThreadPoolExecutor
构造函数中各个参数含义
- 1.
int corePoolSize
:线程池的核心线程数,决定是创建新的线程来处理提交的任务还是将其放入缓存队列
- 1.创建了线程池后,默认情况下线程池中并没有任何线程,而是等待有任才创建线程去执行任务
- 2.当线程池中的线程数目达到
corePoolSize
后,就会把到达的任务放到缓存队列workQueue
中- 2.
int maximumPoolSize
:线程池的最大线程数,决定是创建非核心线程来处理提交的任务还是执行拒绝策略
- 1.当线程池中的线程数目达到
corePoolSize
后,会判断线程数是否达到maximumPoolSize
- 2.当线程池中的线程数目达到
maximumPoolSize
后,就会执行拒绝策略,否则会创建非核心线程来执行缓存队列workQueue
中的任务- 3.
long keepAliveTime
:线程空闲时存活的时间
- 1.当任务执行完后,线程池中空闲线程数量增多,当超过
corePoolSize
时,非核心的线程会在指定时间keepAliveTime
被销毁- 2.默认情况下只有当线程池中的线程数大于
corePoolSize
时,keepAliveTime
才会起作用,即当线程池中的线程数大于corePoolSize
时,如果一个线程空闲的时间达到keepAliveTime
则会终止,直到线程池中的线程数不超过corePoolSize
- 3.
allowCoreThreadTimeOut
属性表示是否回收核心工作线程,默认false
表示不回收核心线程,使用allowCoreThreadTimeOut(true)
方法可以设置线程池回收核心线程,如果调用allowCoreThreadTimeOut(boolean)
方法,线程池中的线程数不大于corePoolSize
时,keepAliveTime
参数也会起作用,直到线程池中的线程数为0
- 4.
TimeUnit unit
:空闲存活时间单位,即参数keepAliveTime
的时间单位,其值是TimeUnit
枚举类- 5.
BlockingQueue<Runnable> workQueue
:任务队列,用于存放已提交但尚未被执行的任务
- 1.一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列
- 2.本质是一个阻塞队列
BlockingQueue
,用来存储等待执行的任务,线程池的排队策略与workQueue
有关- 3.阻塞队列有以下几种选择
- 1.
SynchronousQueue
:直接提交队列- 2.
ArrayBlockingQueue
:有界任务队列- 3.
LinkedBlockingQueue
:无界任务队列- 4.
PriorityBlockingQueue
:优先级队列- 6.
ThreadFactory threadFactory
:线程工厂,用于创建线程执行任务,一般用默认即可,默认使用Executors.defaultThreadFactory()
- 7.
RejectedExecutionHandler handler
:拒绝策略,当线程池中的线程数达到maximumPoolSize
时,使用某种策略来拒绝任务提交,本质是ThreadPoolExecutor
的内部类并且实现了RejectedExecutionHandler
接口
- 1.
ThreadPoolExecutor.AbortPolicy
:默认策略,丢弃任务并抛出拒绝执行RejectedExecutionException
异常- 2.
ThreadPoolExecutor.DiscardPolicy
:丢弃任务但是不抛出异常- 3.
ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面(最老)的任务然后重新尝试执行任务(重复此过程)- 4.
ThreadPoolExecutor.CallerRunsPolicy
:交由调用线程处理该任务- 5.
自定义拒绝策略
:扩展RejectedExecutionHandler
接口,自定义拒绝策略
- 8.其他可参考
https://juejin.cn/post/6987576686472593415
和https://blog.csdn.net/fighting_yu/article/details/89473175
1.直接提交队列
- 1.
SynchronousQueue
:直接提交队列,也称为同步队列- 2.
SynchronousQueue
:一个特殊的BlockingQueue
,其没有容量,每执行一个插入操作就会阻塞,需要在执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作- 3.上述结果显示当任务队列为
SynchronousQueue
,创建的线程数大于maximumPoolSize
时,直接执行了拒绝策略抛出异常- 4.使用
SynchronousQueue
队列时提交的任务不会被保存,而是马上提交执行- 5.如果用于执行任务的线程数量小于等于
maximumPoolSize
,则尝试创建新的进程,如果达到maximumPoolSize
设置的最大值,则根据设置的handler
执行拒绝策略- 6.因此该方式提交的任务不会被缓存起来而是会被马上执行,这种情况下,需要对程序的并发量有准确的评估,才能设置合适的
maximumPoolSize
数量,否则容易会执行拒绝策略
2.有界任务队列
- 1.
ArrayBlockingQueue
:有界任务队列- 2.上述结果显示当任务队列为
ArrayBlockingQueue
,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize
时,则会将新的任务加入到等待队列中- 3.若等待队列已满,即超过
ArrayBlockingQueue
初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize
设置的最大线程数量,若大于maximumPoolSize
则执行拒绝策略- 4.该方式中线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或没有达到超负荷的状态,线程数将一直维持在
corePoolSize
以下,反之当任务队列已满时,则会以maximumPoolSize
为最大线程数上限
3.无界任务队列
- 1.
LinkedBlockingQueue
:无界任务队列- 2.上述结果显示当任务队列为
LinkedBlockingQueue
,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize
- 3.该方式中
maximumPoolSize
参数是无效的,哪怕任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize
后就不会再增加- 4.若后续有新的任务加入,则直接进入阻塞队列等待,当使用这种任务队列模式时,一定要注意任务提交与处理之间的协调控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题
4.优先任务队列
- 1.
PriorityBlockingQueue
:优先任务队列- 2.上述结果显示当任务队列为
PriorityBlockingQueue
,除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行且线程池的线程数一直为corePoolSize
- 3.
PriorityBlockingQueue
其实是一个特殊的无界队列,其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize
的数量- 4.只不过其他队列一般是按照
先进先出
的规则处理任务,而PriorityBlockingQueue
队列可以自定义规则根据任务的优先级顺序先后执行
5.线程池的生命周期
6.线程池的优缺点
- 1.优点
- 1.
降低资源消耗
:复用已创建的线程来降低创建和销毁线程的消耗- 2.
提高响应速度
:任务到达时可以不需要等待线程的创建立即执行- 3.
提高线程的可管理性
:使用线程池能够统一的分配、调优和监控- 2.缺点
- 1.多线程会占
CPU
,使用多线程的地方并发量比较高时会导致其他功能响应很慢
7.线程池的执行流程
- 1.一个任务通过
execute(Runnable)
方法被添加到线程池,任务就是一个Runnable
类型的对象,任务的执行方法就是Runnable
类型对象的run()
方法- 2.当一个任务通过
execute(Runnable)
方法欲添加到线程池时- 3.如果此时线程池中的数量小于
corePoolSize
,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务- 4.如果此时线程池中的数量等于
corePoolSize
,但是缓冲队列workQueue
未满,那么任务被放入缓冲队列- 5.如果此时线程池中的数量大于
corePoolSize
,缓冲队列workQueue
满了并且线程池中的数量小于maximumPoolSize
,创建新的线程来处理被添加的任务- 6.如果此时线程池中的数量大于
corePoolSize
,缓冲队列workQueue
满了并且线程池中的数量等于maximumPoolSize
,则通过handler
所指定的拒绝策略来处理此任务- 7.处理任务的优先级为:核心线程
corePoolSize
、任务队列workQueue
、最大线程maximumPoolSize
,如果三者都满了,使用handler
处理被拒绝的任务- 8.当线程池中的线程数量大于
corePoolSize
时,如果某线程空闲时间超过keepAliveTime
线程将被终止,这样线程池可以动态的调整池中的线程数