当前位置: 首页 > news >正文

线程池夺命十四问

目录

一:什么是线程池

二:线程池有什么好处

三:如何创建一个线程池

Executors

ThreadPoolExecutors

四:创建一个线程池为什么不推荐使用Executors

五:如何设置线程池的大小 

六:线程池有哪些参数

七:线程池有哪些状态 

八:线程池的执行流程

九:如何判断一个线程池的中的任务是否执行完毕

十:线程池的拒绝策略有哪些

十一:线程池的线程是如何复用的

十二:线程池中的阻塞队列有哪些

十三:线程池中的任务出现异常后,是复用还是销毁呢

十四:如何关闭一个线程池


一:什么是线程池

线程池:顾名思义就是一个管理线程的容器,当有任务需要处理的时候,放进任务队列里面,由线程池分配空闲的线程处理任务,处理完任务的线程不会被销毁,而是在线程池中等待下一个任务

二:线程池有什么好处

1.降低资源消耗: 频繁的创建与销毁线程,占用大量资源,线程池的出现避免了这种情况,减少了资源的消耗;
2.提高响应速度: 因为线程池中的线程处于待命状态,有任务进来无需等待线程的创建就能立即执行(前提是有空闲线程,任务量巨大,还是需要排队的哈);
3.更好的管理线程: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

三:如何创建一个线程池

在Java中有两类方法可以创建一个线程池,如下:

Executors

1.Executors.newFixedThreadPool:创建一个固定大小的线程池,并且可以控制并发线程的数量,多余的线程会放在任务队列中等待执行

2.Executors.newCacheThreadPool:创建一个可缓存的线程池,若线程数量超过了线程池设置的参数,会缓存一段时间后回收

3.Executors.newSingleThreadPool:创建单个线程的线程池,他可以保证线程的顺序执行

4.Executors.newScheduledThreadPool:创建可以执行延迟任务的线程池

5.Executors.newSingleThreadScheduleExecutors:创建一个单线程可以执行延迟任务的线程池,是3和4的结合体

6.Executors.newWorkStealingPool:创建一个抢占式的线程池(是根据CPU的核数来确定的)


ThreadPoolExecutors

7.ThreadPoolExecutors:通过手动创建线程池,需要配置参数


四:创建一个线程池为什么不推荐使用Executors

如果大家跟入到Executors这些方法的底层实现中去看一眼的话,立马就知道原因了,像FixedThreadPool 和 SingleThreadExecutor这两个方法内使用的是无界的 LinkedBlockingQueue存储任务,任务队列最大长度为 Integer.MAX_VALUE,这样可能会堆积大量的请求,从而导致 OOM。 而CachedThreadPool使用的是同步队列 SynchronousQueue, 允许创建的线程数量也为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM,其他的方法所提供的均是这种无界任务队列,在高并发场景下导致OOM的风险很大,故大部分的公司已经不建议采用Executors提供的方法创建线程池了。

五:如何设置线程池的大小 

通常来说设置线程数是根据业务需求和实际情况来确定的,原因如下

1.如果当前业务需求的任务类型是多IO类型的(读和写操作比较多)话,线程数可以设置的多一些(2n-1),如果业务需求的任务类型是多cpu计算型的话,那么线程数可以设置为n-1

2.线程数还与cpu的核心数有关,因为我们创建的线程池中本质还是多个线程,而Java线程是与操作系统一一对应,因此也要考虑cpu的情况

六:线程池有哪些参数

在线程池中有七大参数

1.核心线程数:即当前线程池种常驻的线程数量,如果是0的话,则会销毁线程池

2.最大线程数:当有最多任务的时候,该线程池中所有的线程数量

3.临时线程存活时间:如果线程池空闲,除了核心线程以外的临时线程,在超过改时间后会消亡

4.临时线程存活时间单位:临时线程的存活时间单位,一般为毫秒

5.任务阻塞队列:当线程池中所有的线程都在执行任务时,又有新的任务来时,会放进任务阻塞i队列中。

6.线程工厂:是一个创建线程池的工厂,不设置该参数时,用的是默认的线程工厂

7.拒绝策略:当任务队列都已经满了的时候,此时就会触发拒绝策略。表示不在接收新的任务

七:线程池有哪些状态 

通常线程池有以下5中状态

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING: 线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。

  • SHUTDOWN: 不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。

  • STOP: 不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。

  • TIDYING: 1)SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。

    2)线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。

    3)线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态。

  • TERMINATED: 线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。

5种状态转换如下:

八:线程池的执行流程

1、刚new出来的线程池里默认是没有线程的,只有一个传入的阻塞队列;

2、当我们执行execute提交一个方法后,会判断当前线程池中线程数是否小于核心线程数(corePoolSize),如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务;

3、如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。

4、如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。

5、如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,调用RejectedExecutionHandler.rejectedExecution()方法。

源码如下:

public void execute(Runnable command) {// 首先检查提交的任务是否为null,是的话则抛出NullPointerException。if (command == null)throw new NullPointerException();// 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)//private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));int c = ctl.get();// 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)if (workerCountOf(c) < corePoolSize) {// 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务// addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程if (addWorker(command, true))return;// 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化c = ctl.get();}// 2. 尝试将任务添加到任务队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 双重检查线程池的状态if (! isRunning(recheck) && remove(command))  // 如果线程池已经停止,从队列中移除任务reject(command);// 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务else if (!addWorker(command, false))// 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务reject(command);
}

九:如何判断一个线程池的中的任务是否执行完毕

在Java中通常有两种方式判断线程池中线程是否已经执行完毕

1.使用getTakCount总任务数和getCompletedTaskCount已经执行完毕的线程数相比较来判断,该方法缺点是在实际开发中线程池是公用的,因此总任务数是在一直不断变化的,并且得到的也是一个近似值,因此不推荐使用

2.使用FutureTask中的get()方法是等待所有线程执行完毕后才返回结果。

十:线程池的拒绝策略有哪些

在Java中有两类拒绝策略:内置的拒绝策略,自定义拒绝策略

1.在内置的拒绝策略中有四种:

1.AbortPolicy:即默认的拒绝策略,只会抛出一个异常消息

2.CallerRunsPolicy:即哪个线程提交过来的任务,就返回给哪个线程,是不会用线程池中的线程处理

3.DiscardPolicy:即默默丢弃任务,不会做任何的提醒操作

4.DiscardOldestPolicy:即丢弃任务队列中最旧的一个任务,并尝试重新提交任务

自定义拒绝策略

 

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author: dlwlrma* @data 2024年09月17日 19:47* @Description: TODO:模拟自定义线程池拒绝策略*/
public class ThreadDemo {public static void main(String[] args) {Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("线程名称:"+Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}}};//创建线程,线程的任务队列长度为1ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {//自定义线程池拒绝策略System.out.println("执行自定义拒绝策略");}});threadPool.execute(runnable);threadPool.execute(runnable);threadPool.execute(runnable);threadPool.execute(runnable);
}
}

实际开发中,我们不会使用内置的拒绝策略,是因为当内置的拒绝策略过于简单,不利于维护,因此常用自定义的拒绝策略,例如在自定义的拒绝策略中加入mq,当触发了拒绝策略时,发布者会收集异常信息,然后订阅者会根据异常id去处理异常等等

十一:线程池的线程是如何复用的

线程池的核心功能就是实现线程的重复利用,那么线程池是如何实现线程的复用呢? 线程池通过addWorker()方法添加任务,而在这个方法的底层会将任务和线程一起封装到一个Worker对象中,Worker 继承了 AQS,也就是具有一定锁的特性。

然后Worker中有一个run方法,执行时会去调用runWorker()方法来执行任务,我们来看一下这个方法的源码:

final void runWorker(Worker w) {// 获取当前工作线程Thread wt = Thread.currentThread();// 从 Worker 中取出第一个任务Runnable task = w.firstTask;w.firstTask = null;// 解锁 Worker(允许中断)w.unlock(); boolean completedAbruptly = true;try {// 当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行while (task != null || (task = getTask()) != null) {// 锁定 Worker,确保在执行任务期间不会被其他线程干扰w.lock();// 如果线程池正在停止,并确保线程已经中断// 如果线程没有中断并且线程池已经达到停止状态,中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 在执行任务之前,可以插入一些自定义的操作beforeExecute(wt, task);Throwable thrown = null;try {// 实际执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务后,可以插入一些自定义的操作afterExecute(task, thrown);}} finally {// 清空任务,并更新完成任务的计数task = null;w.completedTasks++;// 解锁 Workerw.unlock();}}completedAbruptly = false;} finally {// 工作线程退出的后续处理processWorkerExit(w, completedAbruptly);}
}

在这段源码中我们看到了线程被复用的原因了,就是这个while的循环,当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行;如果从 getTask 获取不到方法的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。

十二:线程池中的阻塞队列有哪些

 常见的阻塞队列有以下几种

// 1、无界队列 LinkedBlockingQueue,容量Integer.MAX_VALUE
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}// 1、无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 2、同步队列 SynchronousQueue,没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务,因此线程最多可创建Integer.MAX_VALUE个。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// 3、DelayedWorkQueue(延迟阻塞队列),添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

十三:线程池中的任务出现异常后,是复用还是销毁呢

这个问题我们要分两种情况去分析,第一种是通过 execute() 提交任务时,在执行过程中抛出异常,且没有在任务内被捕获,当前线程会因此终止,异常信息会记录在日志或控制台中,并且线程池会移除异常线程,重新创建一个线程补上去。

第二种通过submit()提交任务时,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。

通过submit()的底层源码发现,其实它的内部封装的是execute方法,只不过它的任务被放在了RunnableFuture对象里。

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

execute方法会抛出异常终止线程的,为什么submit中不会呢,那么肯定在RunnableFuture里,经过一顿跟踪发现,这个Future中实现的run方法,对异常进行了捕获,所以并不会往上抛出,也就不会移除异常线程以及新建线程了。​​​​​​​

十四:如何关闭一个线程池

在JDK 1.8 中,线程池的停止一般使用 shutdown()、shutdownNow()这两种方法。

1.shutdown

public void shutdown() {final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁mainLock.lock(); // 加锁以确保独占访问try {checkShutdownAccess(); // 检查是否有关闭的权限advanceRunState(SHUTDOWN); // 将执行器的状态更新为SHUTDOWNinterruptIdleWorkers(); // 中断所有闲置的工作线程onShutdown(); // ScheduledThreadPoolExecutor中的挂钩方法,可供子类重写以进行额外操作} finally {mainLock.unlock(); // 无论try块如何退出都要释放锁}tryTerminate(); // 如果条件允许,尝试终止执行器
}

 在shutdown的源码中,会启动一次顺序关闭,在这次关闭中,执行器不再接受新任务,但会继续处理队列中的已存在任务,当所有任务都完成后,线程池中的线程会逐渐退出。

2.shutdownNow

public List<Runnable> shutdownNow() {List<Runnable> tasks; // 用于存储未执行的任务的列表final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁mainLock.lock(); // 加锁以确保独占访问try {checkShutdownAccess(); // 检查是否有关闭的权限advanceRunState(STOP); // 将执行器的状态更新为STOPinterruptWorkers(); // 中断所有工作线程tasks = drainQueue(); // 清空队列并将结果放入任务列表中} finally {mainLock.unlock(); // 无论try块如何退出都要释放锁}tryTerminate(); // 如果条件允许,尝试终止执行器return tasks; // 返回队列中未被执行的任务列表
}

 与shutdown不同的是shutdownNow会尝试终止所有的正在执行的任务,清空队列,停止失败会抛出异常,并且返回未被执行的任务列表。


http://www.mrgr.cn/news/29070.html

相关文章:

  • NoETL
  • 【 ElementUI 组件Steps 步骤条使用新手详细教程】
  • STM32(hal库)在串口中,USART和uart有什么区别?
  • 使用ookii-dialogs-wpf在WPF选择文件夹时能输入路径
  • HTTP 客户端怎么向 Spring Cloud Sleuth 传输跟踪 ID
  • Request和Response
  • 560. 和为 K 的子数组
  • Maya---机械模型制作
  • vs2022快捷键异常解决办法
  • 《Google软件测试之道》笔记
  • 大厂校招:唯品会Java面试题及参考答案
  • 力扣题解815
  • 星火AI-智能PPT生成 API 文档
  • Python 课程15-PyTorch
  • SAP到底是谁的系统?business or IT?
  • IDEA 2024.3 EAP新特征早览!
  • 电脑的固态硬盘
  • 53 最大子数组和
  • 【FreeRL】Rainbow_DQN的实现和测试
  • AI教你学Python :详解Python元组与集合、字典基础和字符串操作(补充)
  • 学成在线练习(HTML+CSS)
  • Spring 源码解读:手动实现Environment抽象与配置属性
  • 【前端】prop传值的用法
  • 等保测评:企业如何选择合适的测评机构
  • Vue特性
  • C++11新增特性:lambda表达式、function包装器、bind绑定