当前位置: 首页 > news >正文

CompletableFuture如何优雅处理异步任务超时!妙就完了

文章目录

  • 1. 主要解决哪些业务痛点?
  • 2. 流程分析
  • 3. 上代码
  • 4. 总结一波

1. 主要解决哪些业务痛点?

小强最近一直没打黑神话悟空,闷闷不乐的,我问咋回事,最近有啥烦心事么?

他不爽的跟我说了当他CompletableFuture进行任务编排时,会发现一个问题,当一个子线程去执行任务,如果任务执行时间很长,导致后面的任务一直阻塞,他在想有没有一种办法,让子线程具有等待超时的特性。

小强对编程的热情确实是高,那我们给他一起分析一下!!

其实在CompletableFuture中,提供了CompletableFuture.get(long timeout, TimeUnit unit) 方法,可以设置超时等待时间,但是这个是对于主线程而言的,在java8中,子线程是没有办法去设置等待超时时间的

其实通俗来讲就是:

就是调用CompletableFuture.supplyAsync()相关方法时,不能够传入子线程的等待时间,因为在很多时候会遇到使用上的一些拘束:

为了让大家了解更清楚,我们带着这个问题去看一个场景题:

接下来看一个场景:

2. 流程分析

流程大体如下:

  1. 我们的主线程会同时起一个异步线程1和异步线程2,并且异步线程1会执行任务A,异步线程2会执行B。
  2. 任务A和任务B的执行时间时不确定的,可能是1秒或者是20秒。但是我们的异步线程只会等待两秒中,如果没执行完,对于A会执行兜底任务B,对于C会执行兜底任务D。

可以思考下:

那我们如何实现子线程的等待超时,可能一下子会去想到CompletableFuture.get(),但是这个是对于主线程而言的,阻塞的粒度太粗了,那如何把超时等待下放到每一个子线程去独立控制呢?

整体思路:

  1. 异步的超时控制,比如定时3s钟,肯定在3秒钟会检查任务是否执行完,肯定会有一个定时器对象到3秒钟去检查。
  2. 检查完之后,如果任务还没完成,不需要等待,直接返回默认值,走接下来的逻辑。

3. 上代码

代码搞起:

并发引擎工具类如下:

public class CompletableFutureTimeoutEngine {static ScheduledThreadPoolExecutor delayer;static ExecutorService executor = Executors.newCachedThreadPool();//定义一个延迟任务器,用来设置执行超时时间后需要执行的任务
static {delayer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("CompletableFutureDelayScheduler");return t;}});delayer.setRemoveOnCancelPolicy(true);
}//当超时时间到时,需要抛出超时异常
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {CompletableFuture<T> result = new CompletableFuture<T>();delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);return result;
}public static <T> CompletableFuture<T> completeOnTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, Function<Throwable, T> function) {return completeOnTimeout(supplier, timeout, unit, function, null);
}/*** @param t                超时默认返回值* @param supplier         任务* @param timeout          超时时间* @param unit             事件单位* @param throwableHandler 异常处理* @param <T>              任务的类型* @return 任务的返回CompletableFuture<T>*/
public static <T> CompletableFuture<T> completeOnTimeout(Supplier<T> supplier, long timeout,TimeUnit unit, Function<Throwable, T> throwableHandler, T t) {return CompletableFuture.supplyAsync(supplier, executor).applyToEither(timeoutAfter(timeout, unit), Function.identity()).exceptionally((throwable) -> {Throwable cause = throwable.getCause();if (cause instanceof TimeoutException) {return t;}return throwableHandler.apply(cause);});
}}

上面主要定义了一个延迟任务器,主要用来执行超时时间后需要执行的任务,在指定的超时时间到达时,会在timeoutAfter方法抛出超时异常,主方法completeOnTimeout用来捕获是否是超过超时时间抛出的超时异常,如果是,则返回默认值,如果不是,执行正常任务的返回。

接下来,我们去调用一下:

public class TimeoutTest {private static void sleep(Long time) {try {Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Integer> completableFutureA = CompletableFutureTimeoutEngine.completeOnTimeout(() -> {sleep(4000L);return 1;}, 2, TimeUnit.SECONDS, (throwable -> {throw new BusinessException("111");}), null);CompletableFuture<Integer> completableFutureB = completableFutureA.thenApply(s -> {if (s == null) {//处理B任务System.out.println("任务A超时,执行B");sleep(1000L);return 1;}return s;});CompletableFuture<Integer> completableFutureC = CompletableFutureTimeoutEngine.completeOnTimeout(() -> {sleep(5000L);return 10;}, 3, TimeUnit.SECONDS, (throwable -> {throw new BusinessException("111");}), null);CompletableFuture<Integer> completableFutureD = completableFutureC.thenApply(s -> {if (s == null) {//处理D任务System.out.println("任务C超时,执行D");sleep(1000L);return 2;}return s;});CompletableFuture<Integer> future = completableFutureB.thenCombine(completableFutureD, Integer::sum);Integer result = future.get(6, TimeUnit.SECONDS);System.out.println("sum = " + result);}}

这个demo的实行顺序就是 completableFutureA -> completableFutureB 和 completableFutureC -> completableFutureD 分别并行执行,future最后拿到指定的结果和。

去执行一下看看:

你会发现completableFutureA的超时等待时间是2秒,正常的任务执行是4s,因此返回超时默认值null,用于和completableFutureB做处理,completableFutureB判断传进来是null,返回值为1。

同样的,completableFutureC的超时等待时间是3秒,正常的任务执行是5s,因此返回超时默认值null,用于和completableFutureD做处理,completableFutureD判断传进来是null,返回值为2。

future对completableFutureB和completableFutureD做聚合,值为3。

4. 总结一波

上面的代码就实现了completableFuture子线程具有超时的特性。是不是看完之后恍然大悟,其实很多中间件都是基于java基础做的。

但是这种方式在并发不高的情况下,可以让作为工具类控制并发流转,但是在并发很高的情况下,定时器机制可能是一个瓶颈,需要换成时间轮或者在业务里面处理超时,具体看自己的业务场景。

感觉对您有所启发的话,记得帮忙点赞,收藏加关注,并且分享给需要的小伙伴!!加油!!


http://www.mrgr.cn/news/33376.html

相关文章:

  • 使用python-Spark使用的场景案例具体代码分析
  • 如何在OCI上配置并使用OCI GenAI服务的步骤
  • [OpenGL]使用OpenGL实现硬阴影效果
  • Openshift 如何更新访问控制机
  • 显示微服务间feign调用的日志
  • 标准C++ 字符串
  • 国人卖家可折叠无线充电器发起TRO专利维权,功能相同可能侵权
  • 【深入学习Redis丨第六篇】Redis哨兵模式与操作详解
  • 图神经网络的新篇章:通用、强大、可扩展的图变换器
  • WebGL基础知识快速入门
  • 空栈压数 - 华为OD统一考试(E卷)
  • thinkphp 做分布式服务+读写分离+分库分表(分区)(后续接着写)
  • 【shell脚本4】Shell脚本学习--字符串和数组
  • 掌控历史:如何通过Git版本管理工具提升你的开发效率
  • 2024华为杯E题成品文章已出!
  • 【AI算法岗面试八股面经【超全整理】——深度学习】
  • 软件开发事故分级极简理解(灾难级、高级、中级、轻微级)
  • 学习C4模型的新网站
  • 【langchain学习】深度解析:Langchain TextSplitter 与新型正则表达式分割器的性能对比
  • 单片机原理及应用详解
  • Redis数据结构之set
  • 校园美食导航:Spring Boot技术的美食发现之旅
  • 【416】【举报垃圾信息】
  • 漏洞复现_永恒之蓝
  • MySQL:事务
  • 代码编辑器 —— SourceInsight实用技巧