当前位置: 首页 > news >正文

Java避坑案例 - “激进”的线程池扩容策略及实现

文章目录

  • 问题
  • 思路
  • 线程池的默认行为
  • 自定义线程池扩容策略
  • Code实现
  • 小结

在这里插入图片描述


问题

Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。

那有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?

举个例子,任务执行得很慢,需要 10 秒,如果线程池可以优先扩容到 5个最大线程,那么这些任务最终都可以完成,而不会因为线程池扩容过晚导致慢任务来不及处理


思路

  1. 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?
  2. 由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢

线程池的默认行为

Java 的线程池默认行为如下:

  • 仅在有任务到来时才初始化核心线程。
  • 当核心线程满后,任务会被放入工作队列。
  • 如果工作队列满,线程池会扩容直到达到最大线程数。
  • 超过最大线程数的任务会根据拒绝策略处理。
  • 当线程数超过核心线程数时,超出线程会在空闲时被回收。

自定义线程池扩容策略

为了使线程池在任务到来时更激进地扩容,我们可以考虑以下两步:

  1. 重写工作队列的 offer 方法
    通过创建一个自定义工作队列,重写 offer 方法,使其在插入任务时总是返回 false,从而模拟队列已满的状态。

  2. 实现自定义拒绝策略
    在达到最大线程数后,我们需要定义一个拒绝策略,在这个策略中,再把任务插入到自定义工作队列中。


Code实现

public int elasticTP() throws InterruptedException {//这里开始是激进线程池的实现//创建一个容量为10的阻塞队列,用于存储待执行的任务BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10) {@Overridepublic boolean offer(Runnable e) {//先返回false,造成队列满的假象,让线程池优先扩容return false;}};//创建一个线程池,核心线程数为2,最大线程数为5,空闲线程存活时间为5秒//使用自定义的线程工厂设置线程名称格式,拒绝执行处理器为尝试再次提交任务到队列ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5,5, TimeUnit.SECONDS,queue, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(), (r, executor) -> {try {//等出现拒绝后再加入队列//如果希望队列满了阻塞线程而不是抛出异常,那么可以注释掉下面三行代码,修改为executor.getQueue().put(r);if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});//激进线程池实现结束//定期打印线程池的状态信息printStats(threadPool);//每秒提交一个任务,每个任务耗时10秒执行完成,一共提交20个任务//任务编号计数器AtomicInteger atomicInteger = new AtomicInteger();//循环提交20个任务到线程池IntStream.rangeClosed(1, 20).forEach(i -> {try {//每秒提交一个任务TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//获取当前任务编号int id = atomicInteger.incrementAndGet();try {//提交任务到线程池执行threadPool.submit(() -> {//任务开始时的日志记录log.info("{} started", id);try {//模拟任务执行时间TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {//任务被中断时不做处理}//任务结束时的日志记录log.info("{} finished", id);});} catch (Exception ex) {//提交任务到线程池执行时发生错误log.error("error submitting task {}", id, ex);//任务提交失败,编号计数器回退atomicInteger.decrementAndGet();}});//等待所有任务完成TimeUnit.SECONDS.sleep(60);//返回最终的任务编号计数return atomicInteger.intValue();
}private void printStats(ThreadPoolExecutor threadPool) {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {log.info("=========================");log.info("Pool Size: {}", threadPool.getPoolSize());log.info("Active Threads: {}", threadPool.getActiveCount());log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());log.info("=========================");}, 0, 1, TimeUnit.SECONDS);
}

BlockingQueueputoffer 方法在行为上有一些重要的区别,主要体现在任务添加失败时的处理方式和返回值上。以下是详细的对比:

  • put:

    • 用于将元素添加到队列中,如果队列已满,put 会阻塞当前线程,直到队列有空间可以添加元素。
    • 语法:
      void put(E e) throws InterruptedException;
      
  • offer:

    • 尝试将元素添加到队列中,如果队列已满,则根据不同的实现可能会立即返回 false,或在某些情况下也可以选择阻塞(例如 offer(E e, long timeout, TimeUnit unit))。
    • 语法:
      boolean offer(E e);
      boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
      

返回值

  • put:

    • put 方法没有返回值,只在成功添加元素后返回。如果操作被中断,会抛出 InterruptedException
  • offer:

    • offer 方法返回一个布尔值:
      • 返回 true 表示成功添加了元素。
      • 返回 false 表示队列已满,未能添加元素。
    • offer(E e, long timeout, TimeUnit unit) 方法在指定时间内等待,如果在超时之前队列有空间可以添加元素,则返回 true,否则返回 false
  1. 适用场景
  • put:

    • 更适合用于需要确保任务被添加到队列中的场景,且在队列满时允许线程等待。
  • offer:

    • 更适合用于希望尽量避免阻塞的场景,尤其是在需要快速尝试添加任务但不希望等待的情况下。

示例代码

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);// 使用 put 方法
try {queue.put(1);  // 成功queue.put(2);  // 成功queue.put(3);  // 阻塞,直到有空间
} catch (InterruptedException e) {Thread.currentThread().interrupt();
}// 使用 offer 方法
if (queue.offer(1)) {System.out.println("成功添加1");
} else {System.out.println("队列已满");
}

总结

  • put 方法适用于需要保证添加成功的场景,可能会导致线程阻塞。
  • offer 方法适用于希望快速检查并添加元素的场景,不会阻塞,但可能会失败。

小结

通过重写 offer 方法来让工作队列显示为已满,迫使线程池在任务到来时优先扩容,是一个很巧妙的思路,哈哈哈。

线程池的实现分析

  1. 自定义工作队列

    • 通过重写 LinkedBlockingQueueoffer 方法,总是返回 false,让线程池认为工作队列已满。这样,线程池会优先扩容到最大线程数。
  2. 自定义拒绝策略

    • 在拒绝策略中,使用 executor.getQueue().offer(r, 0, TimeUnit.SECONDS) 来将任务添加到队列。如果队列已满,会抛出 RejectedExecutionException。 或者选择 executor.getQueue().put(r)
  3. 任务提交

    • 通过 AtomicInteger 来计数已提交的任务。由于每个任务需要 10 秒才能完成,每秒提交一个任务,可以模拟任务积压的情况。

进一步的改进建议

  • 动态调整核心线程数:可以在任务数量超过某个阈值时,考虑动态增加核心线程数,以便更好地利用资源。

  • 更智能的拒绝策略:可以根据具体场景实现一个更加复杂的拒绝策略,比如优先选择某些类型的任务进入队列,或者将任务缓存到内存中。

  • 监控和反馈机制:增加监控机制,实时跟踪线程池的状态,包括活跃线程数、队列长度等,可以帮助在运行时做出更好的决策。

在这里插入图片描述


http://www.mrgr.cn/news/62209.html

相关文章:

  • Nginx代理本地exe服务http为https
  • 深度学习——神经网络中前向传播、反向传播与梯度计算原理
  • vue3 如何封装aixos
  • bigwig(bw)文件转换为bed文件:
  • java基于ThreadLocal实现单例模式
  • 20、【OS】【Nuttx】建一个最小系统工程配置
  • 串口电路设计
  • 3216. 交换后字典序最小的字符串
  • 时间序列分类任务---tsfresh库
  • vscode的一些使用心得
  • Leetcode148,109以及二者的合并 -> Tencent面试算法题 - 无序双向链表转BST
  • 蓝桥杯 python day01 第一题
  • 春季测试 2023 我的题解
  • 达梦数据库在终端/控制台交互查询SQL语句,查询结果导出excel
  • Openjudge:向量点积计算
  • 【Vulnhub靶场】DC-7
  • YOLOv9模型重新参数化,将yolo.pt转为yolo-converted.pt
  • 长文 | 我如何使用 git
  • 【JavaEE】【多线程】进阶知识
  • Comsol CPU水冷散热系统流热固多场耦合仿真
  • el-datepicker此刻按钮点击失效
  • ts:常见的内置数学方法(Math)
  • 面向对象编程——重写和多态
  • UART-通用异步收发器
  • 推荐使用 CompletableFuture 框架进行异步操作,很强很方便
  • 从一到无穷大 #38:讨论 “Bazel 集成仅使用 Cmake 的依赖项目” 通用方法