当前位置: 首页 > news >正文

剖析源码,带你看懂JUC线程池运行机制

文章目录

    • 1.线程池基础
      • 1.1 概念
      • 1.2 作用
      • 1.3 创建方式
        • 1.3.1 构造方法实现
        • 1.3.2 Executors类静态方法实现
      • 1 .4 类型
        • 1.4.1 按线程池数目
        • 1.4.2 按线程池功能
    • 2.线程池工作原理
      • 2.1 任务提交 execute
      • 2.2 增加工作线程addWorker方法
      • 2.3 工作线程运行原理runWorker方法
    • 3.扩展补充
      • 3.1 拒绝策略
      • 3.3shutdown 和shutdownNow 的区别
      • 3.4 与ForkJoinPool区别
    • 4.小结

1.线程池基础

1.1 概念

	借助池化思想,预先构建指定数目的线程用以处理系统任务,其中若干个线程形成一个线程池。池中通常会维护一定数目的线程来处理系统任务,也会根据执行任务的繁忙程度,动态扩增/缩减线程数目,具备动态调整处理系统任务速度的能力。

1.2 作用

1.避免线程频繁创建和销毁,提前创建指定数目的线程用以线程复用降低资源开销以及提升系统响应速度
2.便于对线程资源限制和统一管理,还可以根据任务情况动态实现增减线程提升资源利用率。【这个作用面试经常问】

1.3 创建方式

JDK中创建线程池使用下面这个构造方法来实现的,除了工作窃取线程ForkJoinPool

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 

corePoolSize:核心线程数。 //线程池中一直存在的线程》
maximumPoolSize: 最大线程数。//线程池中最多允许线程
keepAliveTime: 线程空闲存活时间。 //多余空闲线程回收最长等待时间
unit: 时间单位。
workQueue: 工作队列【实现BlokingQueue接口】。 // 存待处理任务
threadFactory: 线程工厂实例
handler:任务队列满了的拒绝策略

1.3.1 构造方法实现

比如new ThreadPoolExecutor() 实现,自定义设置参数,功能灵活,适用于复杂业务场景自定义实现。对使用者要求较高。

1.3.2 Executors类静态方法实现

如下图所示,使用简单,可调整参数少,提供了很多默认实现,适用于场景业务简单。

// 固定线程池,最大和最小线程数相等都是 nThreads
Executors.newFixedThreadPool(int nThreads)
// parallelism 并行处理数,可以不传,默认就是
// 这个Runtime.getRuntime().availableProcessors() CPU处理核数。
Executors.newWorkStealingPool(int parallelism)
// 最大线程数为Integer.MAX_VALUE,最小线程为0,使用很少。
// 线程太多CPU直接跑慢。
Executors.newCachedThreadPool()
// 定时任务线程池,最大线程数为Integer.MAX_VALUE。
Executors.newScheduledThreadPool(int corePoolSize)
// 单个线程构成的线程池
Executors.newSingleThreadExecutor()

1 .4 类型

1.4.1 按线程池数目

1.单线程线程池 newSingleThreadExecutor
2.固定线程线程池 newFixedThreadPool
3.无限线程线程池 newCachedThreadPool

1.4.2 按线程池功能
  1. 普通线程池ThreadPool
    // 上面三种都是普通线程池,通常用于处理IO密集任务,比如大量http请求等
  2. 工作窃取ForkJoinPool
    // 主要用来处理CPU密集型任务,使用少量的线程并行处理大量的计算型任务。
  3. 定时任务线程池scheduledThreadPool
    // 处理延时任务或者是定期执行的任务。比如日志定期删除,缓存定期删除,消息定时发送等

2.线程池工作原理

多线程处理任务大致流程是,首先要构建线程池,其次提交任务给线程池,线程池接收任务,则自动会创建工作线程并运行,来处理任务,如果任务太多,处理不过来则会通过新建工作线程或者是将任务放如工作队列中。其中最为重要的几个方法分别是,提交任务execute,新建工作线程addWorker,工作线程运行 runWorker 这三个方法。

2.1 任务提交 execute

通过JDK21源码来剖析执行机制

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();// ctl使用AtomicInteger 存储的对象,高3为存储线程状态,低29为存储状态下响应线程数  int c = ctl.get();// 按位与操作计算c低29位的值 得到当前线程数// 如果当前线程池小于核心线程池则新建线程成功则结束,未成功继续执行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 线程正在运行 让工作队列加任务,且添加成功if (isRunning(c) && workQueue.offer(command)) {// 二次检查int recheck = ctl.get();// 线程池不是运行状态 则删除任务。有可能存在使用者直接shutdownNow/shutdown 方法直接停止线程池。if (! isRunning(recheck) && remove(command))// 拒绝策略。reject(command);// 如果当前线程为0,可能存在重新启动情况线程刚好为空。else if (workerCountOf(recheck) == 0)// 新增工作线程addWorker(null, false);}// 工作队列满了,新增工作线程,新增失败则执行拒绝策略。else if (!addWorker(command, false))reject(command);}

2.2 增加工作线程addWorker方法

JDK21源码分析,

/*** RUNNING -> SHUTDOWN*    On invocation of shutdown()* (RUNNING or SHUTDOWN) -> STOP*    On invocation of shutdownNow()* SHUTDOWN -> TIDYING*    When both queue and pool are empty* STOP -> TIDYING*    When pool is empty* TIDYING -> TERMINATED*    When the terminated() hook method has completed*private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;
**/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// Check if queue empty only if necessary.// 当线程池处为空并且工作队列为空,此时不增加线程池。// 可结合上面注释代码看懂。if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// core变量为false 情况下,超过最大线程数直接失败。if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// cas 机制,只有一个线程成功    if (compareAndIncrementWorkerCount(c))break retry;// c = ctl.get();  // Re-read ctl// cas 失败 重试。if (runStateAtLeast(c, SHUTDOWN))continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个工作线程Worker对象w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 获取锁 可能存在多个线程调用了execute方法。mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();// 判断是否是运行状态或者是重启状态if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {// 如果不是新建线程状态抛异常if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();// 增加至workers工作线程hashSet实现。workers.add(w);workerAdded = true;int s = workers.size();// 这工作线程数目则重新赋值。这个是计算当前线程池实际运行的最大线程池数if (s > largestPoolSize)largestPoolSize = s;}} finally {//释放锁mainLock.unlock();}if (workerAdded) {// 底层还是调用Thread.start方法来开启线程container.start(t);workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

2.3 工作线程运行原理runWorker方法

通过源码来分析:

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 当前任务或者工作队列任务不为空。循环处理while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 任务执行前逻辑,这个默认实现为空。// JUC大佬采用模板模式,先给开发人员占个位置,让开发者可自定义实现。beforeExecute(wt, task);try {// 执行任务task.run();// 执行后逻辑,方法体为空。交由开发人员自定制实现afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {// 任务清空 GC回收task = null;// 线程处理任务数加一w.completedTasks++;//  释放锁w.unlock();}}completedAbruptly = false;} finally {// 工作线程回收,如果回收后,发现当前线程小于核心线程,会继续创建一个线程加入线程池中。processWorkerExit(w, completedAbruptly);}}

3.扩展补充

3.1 拒绝策略

JUC默认四种策略
AbortPolicy: 直接抛异常。默认实现策略。
DiscardPolicy: 默默丢弃,不抛异常。
DiscardOldestPolicy: 使用poll丢弃一个任务。
CallerRunsPolicy: 交由调用者所在线程执行。

自定义拒绝策略:只需要实现RejectedExecutionHandler 这个接口重写rejectedExecution 方法接口即可。一个简单示例如下:
比如com.zaxxer.hikari.pool.HikariPool 实现了自定义注册策略
在这里插入图片描述

3.3shutdown 和shutdownNow 的区别

shutdown
执行后,线程池不会再接收新任务,但是处于工作队列的任务还是要执行完,方法立刻返回,不会等待队列任务完成返回。【异步】
shutdownNow
线程池不接受新任务,工作队列任务直接丢弃,正在执行任务会直接中断,立刻返回,返回值为队列里面被丢弃的任务列表。

3.4 与ForkJoinPool区别

1.ForkJoinPool 每个线程单独有个队列,ThreadPoolExecutor中只有一个队列。
2.ForkJoinPool 采用双端队列实现,执行任务采用LIFO顺序,窃取任务从尾部获取任务,有效减少任务竞争,当任务只剩一个时候,允许线程之间CAS来竞争获取锁。ThreadPoolExecutor采用FIFO排队方式执行任务。
3.ThreadPoolExecutor适用于IO密集型,ForkJoinPool 适用于CPU密集型,最好是非阻塞任务。

4.小结

本文通过源码分析深度剖析了JUC下线程池运行机制,并对线程池类型和使用做了简单介绍,希望能给大家带来些许帮助,如文中存在错误,可以在评论区交流指正。


http://www.mrgr.cn/news/70634.html

相关文章:

  • Swift 实现判断链表是否存在环:快慢指针法
  • 151页PDF | XX集团数字化转型SAP项目规划方案(限免下载)
  • 表格数据处理中大语言模型的微调优化策略研究
  • 线性回归模型预测房价
  • 挂壁式空气净化器哪个品牌的质量好?排名top3优秀产品测评分析
  • JMeter监听器与压测监控之 InfluxDB
  • [ Linux 命令基础 2 ] Linux 命令详解-系统管理命令
  • Qt | QMediaPlayer+QGraphicsVideoItem视频播放器
  • 无线局域网四种类型
  • 图论算法:最短路径算法详解【c语言版】(无权最短路径、Dijkstra算法)
  • Mysql 基础语法
  • Java API类与接口:类的转换方法与正则表达式
  • 自动驾驶安全方向论文阅读
  • Tomcat(9) web.xml文件的作用
  • 【英特尔IA-32架构软件开发者开发手册第3卷:系统编程指南】2001年版翻译,2-20
  • 人字齿是怎么样的一种齿轮?
  • 《黑神话:悟空》像素版v0.2重磅更新[PC+安卓]
  • 【系统架构设计师-2024下半年真题】综合知识-参考答案及部分详解(完整回忆版)
  • Java static静态变量 C语言文件读写
  • 工程师 - etc/hosts文件
  • 呼叫中心系统监控预警功能的应用
  • [ Linux 命令基础 4 ] Linux 命令详解-文本处理命令
  • (RK3566驱动开发 - 1).pinctrl和gpio子系统
  • Java 抽象类(Abstract Class)详解
  • 生产与配置
  • 前端知识点---Javascript中检测数据类型函数总结