当前位置: 首页 > news >正文

Java避坑案例 - 线程池使用中的风险识别与应对

文章目录

  • 线程池的基本概念
  • 创建线程池的注意事项
    • 实例1: `newFixedThreadPool` 使用无界队列,可能因任务积压导致 OOM
    • 实例2: `newCachedThreadPool` 会创建大量线程,可能因线程数量过多导致无法创建新线程。
  • 线程池参数设置的最佳实践
    • 线程池默认的工作行为
    • 预先启动核心线程
  • 监控线程池状态的方法
  • 线程池的混用策略

在这里插入图片描述


  1. 线程池的基本概念和使用场景
  2. 线程池创建时的注意事项,包括手动创建与使用 Executors 类的区别
  3. 案例分析 newFixedThreadPoolnewCachedThreadPool 可能引发的问题
  4. 线程池参数设置的最佳实践
  5. 监控线程池状态
  6. 线程池的混用策略

线程池的基本概念

在程序中,我们会用各种池化技术来缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定的策略调整池中缓存对象的数量,实现池的动态伸缩

线程池是一种管理线程的机制,通过重用线程来减少创建和销毁线程的开销,适用于处理短平快的任务。线程池的核心组成部分包括核心线程数、最大线程数、工作队列及拒绝策略。


创建线程池的注意事项

在 Java 中,Executors 类提供了快速创建线程池的方法,但在生产环境中,建议手动使用 ThreadPoolExecutor 来创建线程池。这是因为 Executors 提供的某些方法可能导致内存溢出(OOM)等问题。

实例1: newFixedThreadPool 使用无界队列,可能因任务积压导致 OOM

/*** 触发OOM(OutOfMemoryError)的测试方法* 通过向固定大小的线程池中提交大量任务,每个任务在执行时生成大量的字符串并保持在内存中* 直到线程池被关闭,以此来模拟和测试OOM的情况* * @throws InterruptedException 如果在等待线程池终止时被中断*/
@GetMapping("oom1")
public void oom1() throws InterruptedException {// 创建一个固定大小为1的线程池,以控制并发任务的数量ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);// 打印线程池的统计信息,监控线程池的状态printStats(threadPool);// 提交给线程池大量的任务,以模拟高并发的场景for (int i = 0; i < 100000000; i++) {// 每个任务在执行时会生成一个很大的字符串,并尝试将其保持在内存中threadPool.execute(() -> {// 生成一个由大量字符组成的字符串,以占用大量内存String payload = IntStream.rangeClosed(1, 1000000).mapToObj(__ -> "a").collect(Collectors.joining("")) + UUID.randomUUID().toString();// 使当前任务暂停1小时,模拟长时间运行的任务try {TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {// 捕获中断异常,但不执行任何操作}// 记录生成的字符串,进一步增加内存的使用log.info(payload);});}// 关闭线程池,不再接受新的任务threadPool.shutdown();// 等待线程池中的所有任务完成,或直到指定的超时时间结束threadPool.awaitTermination(1, TimeUnit.HOURS);
}/*** 定期打印线程池的运行统计信息* 此方法内部创建了一个新的单线程调度器,用于定期执行打印线程池统计信息的任务* 它提供了线程池大小、活动线程数、已完成任务数和队列中任务数的信息* 这些信息有助于监控线程池的性能和工作负载* * @param threadPool 线程池对象,其统计信息将被打印*/
private void printStats(ThreadPoolExecutor threadPool) {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {// 打印分割线,用于区分不同的统计时间点log.info("=========================");// 打印线程池当前的线程数量log.info("Pool Size: {}", threadPool.getPoolSize());// 打印当前活动线程的数量log.info("Active Threads: {}", threadPool.getActiveCount());// 打印已完成任务的总数log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());// 打印队列中等待执行的任务数量log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());// 再次打印分割线,结束本次统计信息的打印log.info("=========================");}, 0, 1, TimeUnit.SECONDS);
}

在这里插入图片描述

newFixedThreadPool 方法线程池的工作队列直接 new 了一个
LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue 是一个Integer.MAX_VALUE 长度的队列,可以认为是无界的

虽然使用 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM


实例2: newCachedThreadPool 会创建大量线程,可能因线程数量过多导致无法创建新线程。

/*** 触发OOM(OutOfMemoryError)的测试方法* 通过创建大量的线程和字符串对象,最终导致内存溢出* 此方法主要用于演示和测试目的,实际应用中应避免此类设计*/
@GetMapping("oom2")
public void oom2() throws InterruptedException {// 创建一个可缓存的线程池,按需(每个任务)创建新线程ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();// 打印线程池的统计信息printStats(threadPool);// 循环提交大量任务到线程池for (int i = 0; i < 100000000; i++) {// 每个任务生成一个随机UUID字符串,并尝试休眠1小时threadPool.execute(() -> {String payload = UUID.randomUUID().toString();try {TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {// 捕获中断异常,但不执行任何操作}// 日志记录生成的UUID字符串log.info(payload);});}// 关闭线程池,不再接受新任务threadPool.shutdown();// 等待线程池中的所有任务完成,最多等待1小时threadPool.awaitTermination(1, TimeUnit.HOURS);
}

在这里插入图片描述

查看newCachedThreadPool 的源码可以看到,这种线程池的最大线程数是Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

由于我们的任务需要 1 小时才能执行完成,大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。


不建议使用 Executors 提供的两种快捷的线程池

  • 需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数

  • 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题

除了建议手动声明线程池以外,还建议用一些监控手段来观察线程池的状态。线程池这个组件往往会表现得任劳任怨、默默无闻,除非是出现了拒绝策略,否则压力再大都不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题


线程池参数设置的最佳实践

根据应用场景,合理设置以下参数:

  • 核心线程数:应根据任务的并发性和执行时间进行调整。
  • 最大线程数:应限制线程数量以防止资源耗尽。
  • 工作队列:应使用有界队列来防止无穷的任务积压。
  • 拒绝策略:根据应用需求选择合适的拒绝策略,例如 AbortPolicyCallerRunsPolicy
import com.google.common.util.concurrent.ThreadFactoryBuilder;/*** 处理 "good" GET 请求的控制器方法* 该方法演示了如何在Spring MVC环境中使用线程池执行异步任务* 它创建了一个固定大小的线程池,并提交了多个任务去执行* * @return 返回 AtomicInteger 的值,用于跟踪任务的完成数量* @throws InterruptedException 如果在等待过程中线程被中断*/
@GetMapping("good")
public int good() throws InterruptedException {// 使用 AtomicInteger 来跟踪任务的唯一标识AtomicInteger atomicInteger = new AtomicInteger();// 创建一个 ThreadPoolExecutor,配置核心线程数、最大线程数、空闲线程存活时间、工作队列等ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5,5, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(),new ThreadPoolExecutor.AbortPolicy());// 打印线程池的统计信息,监控线程池状态printStats(threadPool);// 生成并提交 20 个任务到线程池IntStream.rangeClosed(1, 20).forEach(i -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 获取并递增任务IDint id = atomicInteger.incrementAndGet();try {// 提交任务到线程池执行threadPool.submit(() -> {log.info("{} started", id);try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}log.info("{} finished", id);});} catch (Exception ex) {// 如果任务提交失败,记录错误信息并递减任务IDlog.error("error submitting task {}", id, ex);atomicInteger.decrementAndGet();}});// 主线程休眠60秒,等待任务完成TimeUnit.SECONDS.sleep(60);// 返回完成的任务数量return atomicInteger.intValue();
}

初始化线程池:

创建一个 AtomicInteger 对象 atomicInteger,用于记录任务ID。
创建一个 ThreadPoolExecutor 对象 threadPool,配置核心线程数为2,最大线程数为5,空闲线程存活时间为5秒,任务队列容量为10,线程工厂设置名称格式,拒绝策略为AbortPolicy。

打印线程池状态:

调用 printStats 方法,每隔1秒打印线程池的当前状态。

提交任务:

  • 使用 IntStream.rangeClosed(1, 20) 生成1到20的整数流。
  • 每次迭代中,休眠1秒,获取任务ID,尝试提交任务到线程池。
  • 任务内容为记录开始日志,休眠10秒,记录结束日志。
  • 如果提交失败,记录错误并减少任务计数。

等待和返回结果:

等待60秒后,返回提交任务的总数。

线程池默认的工作行为

  1. 核心线程数 (corePoolSize):

    • 线程池在没有任务时,会保持corePoolSize个线程活跃。
    • 当有任务提交时,线程池会首先尝试复用这些核心线程。
  2. 任务堆积

    • 如果所有核心线程都在忙碌,后续的任务会被添加到工作队列(如LinkedBlockingQueue)中等待处理。
    • 这个工作队列的大小是有限的,默认情况下是无界的。
  3. 扩容

    • 一旦工作队列满了,线程池会尝试扩容,创建新的工作线程,直到达到maximumPoolSize
    • maximumPoolSize是线程池允许的最大线程数。
  4. 拒绝策略

    • 如果队列和线程池都已满,线程池会根据预设的拒绝策略(如AbortPolicyCallerRunsPolicy等)处理新提交的任务。
  5. 线程回收

    • 当线程数大于corePoolSize时,如果线程在keepAliveTime内没有处理新任务,它们会被终止,回收至核心线程数。

以下是ThreadPoolExecutor构造函数的参数及其影响:

  • corePoolSize:核心线程数,保持活跃的最小线程数。
  • maximumPoolSize:最大线程数,能够创建的最大线程数量。
  • keepAliveTime:非核心线程闲置时间,超过此时间后将被回收。
  • unit:时间单位,与keepAliveTime一起使用。
  • workQueue:用于存储等待执行任务的队列。
  • handler:拒绝策略,用于处理无法被执行的任务。

预先启动核心线程

在Java的ThreadPoolExecutor中,预先启动核心线程意味着在创建线程池时,线程池会立即启动并激活corePoolSize个核心线程。这可以减少任务到达时的响应延迟,尤其是在预计会有大量任务同时提交的场景下。

相关参数和方法

  • 核心线程数 (corePoolSize):指定要保持活跃的最小线程数。
  • allowCoreThreadTimeOut:如果设置为true,当核心线程在keepAliveTime内没有执行任务时,它们会被回收。

预先启动核心线程的方法

可以使用prestartAllCoreThreads()方法来预先启动所有核心线程:

  • prestartCoreThread():启动单个核心线程(如果核心线程未被创建)。
  • prestartAllCoreThreads():启动所有核心线程。

演示如何预先启动核心线程:

import java.util.concurrent.*;public class PrestartCoreThreadsExample {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(2, // corePoolSize4, // maximumPoolSize60, // keepAliveTimeTimeUnit.SECONDS, // unitnew LinkedBlockingQueue<>() // workQueue);// 预先启动所有核心线程executor.prestartAllCoreThreads();// 提交任务for (int i = 0; i < 5; i++) {final int taskId = i;executor.execute(() -> {System.out.println("Executing task " + taskId);try {Thread.sleep(1000); // 模拟任务执行} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}executor.shutdown(); // 关闭线程池}
}

监控线程池状态的方法

建议在生产环境中添加监控,定期输出线程池的状态信息。可以使用定时任务定期打印线程池的基本信息,如线程数、活跃线程数、完成任务数量等。

/*** 定期打印线程池的运行统计信息* 此方法内部创建了一个新的单线程调度器,用于定期执行打印线程池统计信息的任务* 它提供了线程池大小、活动线程数、已完成任务数和队列中任务数的信息* 这些信息有助于监控线程池的性能和工作负载* * @param threadPool 线程池对象,其统计信息将被打印*/
private void printStats(ThreadPoolExecutor threadPool) {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {// 打印分割线,用于区分不同的统计时间点log.info("=========================");// 打印线程池当前的线程数量log.info("Pool Size: {}", threadPool.getPoolSize());// 打印当前活动的线程数量log.info("Active Threads: {}", threadPool.getActiveCount());// 打印已完成任务的总数log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());// 打印队列中等待执行的任务数量log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());// 再次打印分割线,结束本次统计信息的打印log.info("=========================");}, 0, 1, TimeUnit.SECONDS);
}

线程池的混用策略

要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列

  1. IO密集型任务

    • 特点:执行时间较长、数量较少,通常涉及网络请求、文件操作等。
    • 配置建议:
      • 可以增加核心线程数,因为这些任务通常会在等待IO时处于阻塞状态。
      • 不需要太大的任务队列,以避免内存消耗过大。
  2. 计算密集型任务

    • 特点:执行时间短、数量较多,主要涉及CPU计算。
    • 配置建议:
      • 线程数量应接近CPU核心数或核心数的两倍,以优化CPU资源利用。
      • 需要较长的任务队列来处理任务高峰,防止因线程不足导致任务拒绝。

优化策略

  1. 根据任务特性选择线程池

    • 对于IO密集型任务,使用较大的线程池以处理多任务并发。
    • 对于计算密集型任务,使用较小的线程池,避免线程切换的开销。
  2. 合理设置核心参数

    • corePoolSize:根据任务特性选择适当的线程数。
    • maximumPoolSize:根据系统资源设置合理的最大线程数。
    • keepAliveTime:调节非核心线程的存活时间,以便更好地应对任务波动。
  3. 使用不同类型的线程池

    • 对于短期任务,可以考虑使用CachedThreadPool,它会动态创建和回收线程。
    • 对于长期任务,可以使用FixedThreadPool,确保线程数不变,适合处理稳定负载的任务。

CachedThreadPoolFixedThreadPool的特性

  1. CachedThreadPool

    • 特点:可以动态创建线程,根据需要创建新的线程,空闲的线程会被回收,适合短时间内大量并发任务。
    • 优点:灵活性高,能够适应突发的任务需求。
    • 缺点:在任务量大且长时间运行时,可能导致资源耗尽或系统负载过高。
  2. FixedThreadPool

    • 特点:线程池中线程数固定,适合长期稳定的负载。
    • 优点:可控性强,避免了因动态创建线程导致的资源问题。
    • 缺点:在任务量大时可能会出现任务被阻塞的情况,因为线程数不够。

适用场景与局限性

  • 短期任务

    • 适合使用CachedThreadPool。当任务数量不确定且突发性强时,CachedThreadPool能够快速响应并执行任务。
    • 注意事项:应监控系统资源,以防因过多线程创建导致内存或CPU负担过重。
  • 长期任务

    • 适合使用FixedThreadPool。对于稳定负载的情况,FixedThreadPool可以确保线程数不变,提升执行效率。
    • 注意事项:线程数过少可能会导致任务排队,线程数过多可能会增加上下文切换的开销。

在这里插入图片描述


http://www.mrgr.cn/news/62109.html

相关文章:

  • EntityFrameworkCore 投影(Projection)SELECT
  • 在Windows上 安装使用repo
  • k8s helm部署kafka集群(KRaft模式)——筑梦之路
  • 智能体框架——lagent初探
  • node.js内置模块之---fs 模块
  • web实操9——session
  • 数据分析常用模型:RFM模型、漏斗模型、AARRR模型
  • 肿瘤B细胞图谱的多维探索:三篇前沿研究详解与对比
  • Spring Boot,Mybatis Plu连接 Sql Server 数据库源(根据 sql server 自动生成代码结构),解决报错
  • Scikit-learn和Keras简介
  • Redis面试总结(一)
  • springcloud整合sentinel,限流策略持久化到nacos,详细配置案例
  • MySQL—基础学习
  • 照片不完整?来试试智能扩图,简直不要太满意!(不是广告)
  • 大模型面试题63题(1-11)
  • 在vue中,使用this.$refs.myDiv.offsetHeight获取组件高度,结果却报错,是因为...
  • ljjh#True
  • Java继承的super关键字
  • 【C++刷题】力扣-#594-最长和谐子序列
  • C++ 之 VS2010 和MySQL数据库的链接问题
  • leetcode452. 用最少数量的箭引爆气球
  • Autosar AP SM中同EM相关的核心概念解析
  • 《探秘 POC 方案:开启创新之门的钥匙》
  • 如何使用SOCKS5代理提升匿名性
  • 两台主机只能单方向ping通
  • Spring Boot 创建项目详细介绍