当前位置: 首页 > news >正文

爆肝线程池底层源码,高并发精进之路!

    线程池一直是开发中的热门组件,在并发访问,异步调用等方面屡试不爽,有没有,在下班回家的路上,夜深人静的夜里,有没有想过,线程池的底层是如何运作的?七大参数在底层是如何关联的?线程池的execute()里面到底干了哪些事?线程池里面有哪些坑?等等一系列问题,或者你在了解底层源码之后可以做很多扩展,比如动态线程池,强烈建议大家花几分钟时间看看下面的线程池底层源码分析,可能会给您带来不一样的收获!加油,探索才会让我们成长。

     并且呢,之后我们会陆续聊到定时线程池,TransmittableThreadLocal,Future等等一些异步源码和常见的坑,让大家可以少走弯路,能够写出优雅,高性能代码,话不多说,回到正题,今天先把线程池源码聊明白了!!冲就完了

文章目录

  • 1. 为什么要使用线程池
  • 2. 线程底层架构
  • 3. 线程池里面worker源码
    • 3.1 简介
    • 3.2 woker类的理解
    • 3.3 Worker的关键源码分析
    • 3.4 核心方法getTask()
    • 3.5 processWorkerExit()
    • 3.6 Worker到底怎么处理异常
  • 4. 线程池的锁
    • 4.1 mainLock锁
    • 4.2 Worker实现的AQS锁
      • 4.2.1 Worker锁有哪些特性
      • 4.2.2 Worker锁要什么要设计成不可重入
  • 5. 线程池是如何保证核心线程一直存活

1. 为什么要使用线程池

在实际使用中,线程是很占用系统资源的,如果对线程管理不善很容易导致系统问题。 因此,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有如下好处:

  1. 降低资源消耗

通过复用已存在的线程和降低线程关闭的次数来尽可能降低系统性能损耗

  1. 提升系统响应速度

通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度

  1. 提高线程的可管理性

线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源, 还会降低系统的稳定性,甚至导致系统崩溃,因此,需要使用线程池来管理线程。

大家熟知的七大参数"corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,拒绝策略 handler",兄弟们,这个我就不展开聊7个参数含义了,这个我感觉大家基本都会,直接上干货!!线程池底层源码架构走起!!

2. 线程底层架构

那这这种架构是怎么运行呢?请继续往下看:

  1. 线程池的核心线程和最大线程在线程池内部主要以worker的形式存在,其实也就是说worker是真正干活的那个类。
  2. 当主线程去提交任务的时候会去判断当前的worker数量是否小于设置的核心线程数,如果小于,会创建worker去执行任务,如果大于核心线程数,小于最大线程数,会放到等待执行队列,如果队列满了,则会继续创建worker。
  3. 那这块有个问题,我们之前都知道最大线程有一个存活时间,如果存活时间到了,此时任务阻塞队列里面没有任务,就会消失,那这块底层机制到底是如何运行的呢?其实当线程内部创建的worker大于核心线程数时,此worker执行完任务之后,会阻塞监听任务队列,并且会设置超时时间,那此时的超时时间就是最大线程数的存活时间keepAliveTime,如果在超时时间内未获取到任务,这个worker就会关闭。

上面的这些文字第一眼看起来比较晦涩,但是大家可以参照这个思路去看下源码,你一定会有所收获!!!

坚持住,我们去看下线程池里面最重要的工作类worker的源码!!!

3. 线程池里面worker源码

3.1 简介

   <font style="color:#000000;">Worker是ThreadPoolExecutor中的内部类,它其实就可以相当于线程池中存储的线程,用来执行提交给线程池的任务,worker其实也不算是一个纯粹的线程,因为线程只是它的一个内部的属性,在内部还有一些其他的属性,来统计一些相关的任务信息。</font>

3.2 woker类的理解

  ![](https://cdn.nlark.com/yuque/0/2023/png/21591941/1703410421813-3c6c6bcc-7429-4a71-aa00-bcb1cd883927.png)

Worker的属性:

(1) firstTask :用于保存线程池提交的任务

(2) thread :worker工作线程的引用,实际处理任务的角色就是worker.

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//使用threadFactory来创建线程this.thread = getThreadFactory().newThread(this);
}}

Worker继承了AQS,使用AQS来实现独占锁,并且是不可重入也就是说Worker对象本身就有lock()、unlock()、tryLock()、isLocked()等方法可以调用,用来给Worker对象加锁,那这些锁是干嘛用的,别急,先把worker源码看完之后,我们都会去聊到。

3.3 Worker的关键源码分析

当你去执行线程池执行任务的时候:

public void execute(Runnable command) {
if (command == null)throw new NullPointerException();
//获取worker的数量
int c = ctl.get();
//如果woker的数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {//增加workerif (addWorker(command, true))return;c = ctl.get();
}
//如果worker的数量大于核心线程数,加入任务阻塞队列
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);
}
//如何任务队列已满,继续增加worker,如果此时已经大于最大核心线程数,会触发决绝策略
else if (!addWorker(command, false))reject(command);
}

那addWorker的源码逻辑是怎样呢,继续往下看!!,坚持住

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//根据core的值,如果workeCount的数量大于核心线程数或者最大线程数时,不会在去//创建worker。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//创建worker,并且会把Worker放到workers的集合set中,//由于set的类型是HashSet是线程不安全的,所以在外层加锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

你会发现,启动worker是通过t.start(),那他从源码角度是具体如何启动起来的呢?继续往下看!!!

其实在w = new Worker(firstTask);创建worker的时候,会在worker内部创建线程,如下:

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//会创建内部线程,并且把worker作为任务传进来,此时worker一定是继承Runnablethis.thread = getThreadFactory().newThread(this);
}

从上面会可以看出,会在内部创建一个新的线程,并且把worker作为任务传进来,所以在调用t.start()时候,实际上会调用woker的run()方法。

public void run() {//会调用runWorker方法runWorker(this);
}

那接下来我们重点看下runWorker做了哪些操作:

// 向线程池中添加线程成功,并且启动也成功,则会执行Worker对象的run方法
final void runWorker(Worker w) {
//这个线程就是worker持有的线程
Thread wt = Thread.currentThread();
//在构建worker传进来的任务,其实就是刚刚客户端提交的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {while (task != null || (task = getTask()) != null) {//给worker进行加锁,因为当线程池调用shutdown时,会通过尝试加锁来判断//任务是否正在执行w.lock();//判断线程是否属于stop后的状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//在执行任务的run方法前执行beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 扩展使用,在执行任务的run方法之后执行afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// 正常执行完任务completedAbruptly = false;
} finally {// 线程池中所有的任务都处理完后,或者执行任务的过程中出现了异常,就会执行该方法processWorkerExit(w, completedAbruptly);
}
}

从上面方法可以看出,会有一个while循环,在这个while循环里通过getTask()会一直的获取任务,获取完任务之后进行执行,那接下来,我们再去瞅瞅getTask()的逻辑。

3.4 核心方法getTask()

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;//从队列里面成功后任务runnable
private Runnable getTask() {//记录上一次获取任务是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {// 这一部分是判断线程池状态// 获取线程池的状态和线程池中线程数量组成的整形字段,32位// 高3位代表线程池的状态,低29位代表线程池中线程的数量int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.// 线程池状态>= SHUTDOWN状态 && (线程状态 >=STOP 状态,或者任务队列为空)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//返回空return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了      // setMaximumPoolSize方法;// timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次 // 从阻塞队列中获取任务发生了超时// 接下来继续判断,如果有效线程数量大于1,或者阻塞队列是空的,// 那么在满足上面的情况下,会尝试将workerCount减1;并且返回空if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 获取任务// 如果timed = true ,说明大于核心线程数,说明需要做超时控制,则根据 // keepAliveTime设置的时间内,阻塞等待从队列中获取任务// 如果timed = false,说明不需要做超时控制,则阻塞,             // 直到从队列中获取到任务为止// keepAliveTime正好对应于我们最大线程数的超时时间Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//在允许的时间内会没有获取到任务timedOut = true;} catch (InterruptedException retry) {//相关于在监听队列时被中断,又会进入getTask循环timedOut = false;}}
}

这块其实有一个问题大家可以考虑下:当执行任务的时候,任务报错了,那在这种情况下,worker应该如何处理呢?继续往下看

3.5 processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果 completedAbruptly = true ,则线程执行任务的时候出现了异常,需要从线程池中减少一个线程// 如果 completedAbruptly = false,则执行getTask方法的时候已经通过compareAndDecrementWorkerCount减1,这里无需在进行减1操作if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//尝试是否要去结束线程池tryTerminate();// -当线程池是RUNNING或SHUTDOWN状态时// --如果worker是异常结束(即completedAbruptly为true),那么会直接addWorker;// ---如果allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个worker;// ---如果allowCoreThreadTimeOut = false,活跃线程数不少于corePoolSizeint c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {// 根据allowCoreThreadTimeOut的值,来设置线程池中最少的活跃线程数是0还是corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果等待队列中有任务,要至少保留一个workerif (min == 0 && ! workQueue.isEmpty())min = 1;// 如果活跃线程 > 最小核心线程数,就不会再次启动了if (workerCountOf(c) >= min)//大于核心线程数的线程会关闭return; // replacement not needed}//假设为异常的情况下,这块会重新添加workeraddWorker(null, false);}
}

上面的代码主要干了一件事:当执行任务发生异常的时候,此时worker内部的线程会由于异常而中断,因此需要再次重启一个worker。

3.6 Worker到底怎么处理异常

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;
} finally {processWorkerExit(w, completedAbruptly);
}
}

通过源代码可以看出,对应的Exception都是保存在thrown中,在finally中交给了afterExecute进行了处理。

但是有个问题,如果我们不实现这个方法时,会导致我们的异常被吞掉。

所以可以自己实现对应的afterExecute来进行处理系统内部发生的异常问题。

你以为这就完了??NO,NO,NO,必须得继续,我们去聊一下源码里面的重要特性!!

4. 线程池的锁

4.1 mainLock锁

mainLock

锁类型:ReentrantLock

使用时机:在创建线程对象Worker的时候,会把Worker放到workers的集合set中,由于set的类型是HashSet是线程不安全的,所以在外层加锁,保证worker在添加时候是线程安全的,不会相互覆盖。

4.2 Worker实现的AQS锁

4.2.1 Worker锁有哪些特性

在理解worker之前我们要看下什么是AQS,可以参考

https://www.yuque.com/likang-qinll/isegnd/ymvg79ewrpxsl116?singleDoc# 《AQS》

具体使用的地方是Worker去执行任务的时候

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {//加锁,用worker类本身来加锁w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {//真正去执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

点进来这个lock方法,看一下,实际会调用acquire方法,从AbstractQueuedSynchronizeracquire方法

看下,会发现调用了Worker的tryAcquire方法,此时你会发现一个问题:在这个方法里面并内有判断当前线程是否为已经获取锁的线程,说明Worker这个锁不具备可重入的特性。

//
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{public void lock()        { acquire(1); }protected boolean tryAcquire(int unused) {//这块并没有比较是否是原线程,if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}}public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
}

4.2.2 Worker锁要什么要设计成不可重入

那问题来了,那么为什么worker要设计成不可重入锁,我们可以再去看下线程池的shutDown方法:


public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//设置线程池的状态advanceRunState(SHUTDOWN);//中断空闲的workerinterruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
//中断空闲的worker
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {//这个其实就是控制worker运行的线程,可以通过 t.interrupt()来中断wo //rkerThread t = w.thread;//很重要(怎么判断你是否空闲的worker),如何去判断和中断空闲线程//判断当前的线程不是已被中断切尝试加锁,如果有任务运行,那必然w.tryLock()//返回false.if (!t.isInterrupted() && w.tryLock()) {try {//当worker空闲时,会打上中断的标记t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}

所以发现,正是设计了不可重入锁,所以才在shutDown方法执行时,不会打断正在执行的任务。

5. 线程池是如何保证核心线程一直存活

其实在底层是利用阻塞队列的特性,当核心线程执行完任务之后,底层用take去阻塞等待从队列中获取任务,此时如果再有新的任务进来,会直接放到队列之中,那这样核心线程就可以获取任务去执行。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

在这里插入图片描述


http://www.mrgr.cn/news/75264.html

相关文章:

  • Colossal-AI:深度学习大规模分布式训练框架
  • 在 Azure 100 学生订阅中新建一台 Ubuntu VPS,并通过 Docker 部署 Nginx 服务器
  • 实战开发:基于用户反馈筛选与分析系统的实现
  • js前序遍历等
  • 【从0带做】基于Springboot3+Vue3的高校食堂点餐系统
  • jQuery UI 主题
  • 相交链表
  • 曹操为什么总是亲征
  • 1050 String Subtraction (20)
  • C++笔记
  • 多模态大模型(2)--BLIP
  • 【电子设计】按键LED控制与FreeRTOS
  • NGUI————按钮练习题
  • Towards Reasoning in Large Language Models: A Survey
  • Spring加载流程,Springboot自动装配原理
  • android开发
  • exo - 使用日常设备运行AI集群
  • 2024年09月CCF-GESP编程能力等级认证Python编程一级真题解析
  • 微信小程序-prettier 格式化
  • Diffusion Policy——斯坦福机器人UMI所用的扩散策略:从原理到其编码实现(含Diff-Control、ControlNet详解)
  • leetcode hot100【LeetCode 105.从前序与中序遍历序列构造二叉树】java实现
  • Web性能优化:从基础到高级
  • 二叉树的遍历(手动)
  • 一文了解Android的核心系统服务
  • 不宽的宽字符
  • 面试中如何回答“怎样实现 RPC 框架”的问题?