当前位置: 首页 > news >正文

JDK17 CompletableFuture

文章目录

  • 背景
  • CompletableFuture定义与用法
  • Completion
  • thenApplyAsync方法
  • run()方法
  • postComplete()

背景

Java的异步并发Future接口表示异步计算的结果。CompletableFuture是对Future接口的增强,它实现CompletionStage接口,允许链式组合异步操作,组合多个异步任务的结果,处理异常情况,任务结束时执行回调方法。

CompletableFuture定义与用法

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {volatile Object result;       // Either the result or boxed AltResultvolatile Completion stack;
}

CompletableFuture类有2个成员变量,volatile修饰符保证多线程环境下变量可见性。result表示当前任务的结果,可能是正常结果,可能是异常对象。stack是当前任务确定结果后接下来所有执行的任务栈。

CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

常用的创建方法中,supplyAsync有返回值,runAsync没有返回值(返回值类型为Void)。async表示异步,即runnable任务不由调用线程执行,而是由线程池执行。
默认线程池是ForkJoinPool,这是一个全局唯一的线程池。为了管理任务,建议对不同类型任务分别分配线程池。

Completion

abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;abstract CompletableFuture<?> tryFire(int mode);abstract boolean isLive();public final void run()                { tryFire(ASYNC); }public final boolean exec()            { tryFire(ASYNC); return false; }public final Void getRawResult()       { return null; }public final void setRawResult(Void v) {}
}

CompletionCompletableFuture的内部类。CompletableFuture强调存储任务结果,而Completion强调计算单个任务以及组合多个任务,强调动作,所以它继承Runnable,可以异步执行。例子:

ExecutorService executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
CompletableFuture base = new CompletableFuture();
base.thenApply(u -> {return "h";
});
base.thenAcceptAsync(s -> System.out.println(s), executor);
base.thenRunAsync(() -> System.out.println("c"), executor);
// 时间点1
base.complete("s");

程序运行到时间点1,base任务还没完成(等base.complete("s")结束base对象的result=s才算完成),它将三个thenXxx()方法装在栈里,以便将来调用,关系如图。每个thenXxx()方法都对应一个Completition对象,后调用的方法在栈顶,先调用的在栈底。栈的底层是链表。stack表示当前任务完成后将要执行的任务,即栈顶元素。
注意:这里的栈是Treiber stacks,用CAS原子性更改栈顶元素,实现线程安全地更改栈。
在这里插入图片描述
源码中thenApply()方法对应的不是Completion类本身,而是它的子类UniApply类。Completion的子类很多,如下图,它们的区别是

  1. 输入不同。比如Uni开头的子类表示1个输入,Bi开头的子类表示2个输入。比如UniApply接收Function类型任务,UniRun接受Runnable类型任务。
  2. 功能不同。比如BiRun表示两个输入都已运算结束才行,OrRun表示其中1个结束就行。比如Signaller类用于当前线程挂起后(比如get()方法)将来唤醒本线程(就是执行LockSupport.unpark(本线程对象))。
    在这里插入图片描述
    图片来自https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html。

thenApplyAsync方法

CompletableFuture#thenApplyAsync()为例,分析执行流程。如果前置任务已经有结果,那么不加入stack栈,直接运算,如果前置任务还没算完,那么用unipush()方法加入栈。

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {// screenExecutor()方法的作用是校验线程池参数return uniApplyStage(screenExecutor(executor), fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();Object r;if ((r = result) != null) // `result`是前置任务的结果,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象的`result`变量,即上面代码中base对象的result变量。return uniApplyNow(r, e, f); // `uniApplyNow()`方法的含义是:本线程立即执行,不加入`stack`栈CompletableFuture<V> d = newIncompleteFuture(); // 源码中有非常多`d`,都表示本次计算的结果unipush(new UniApply<T,V>(e, d, this, f));return d;
}
private <V> CompletableFuture<V> uniApplyNow(Object r, Executor e, Function<? super T,? extends V> f) {Throwable x;CompletableFuture<V> d = newIncompleteFuture(); // d表示本次计算任务的返回结果if (r instanceof AltResult) { // 如果前置任务抛出异常if ((x = ((AltResult)r).ex) != null) {d.result = encodeThrowable(x, r); // 当前任务也不执行`Function`方法,直接抛出同样异常return d;}r = null;}try {if (e != null) {e.execute(new UniApply<T,V>(null, d, this, f)); // 交给线程池执行`UniApply`的`run()`方法。第一个参数null是线程池对象,此时已经不需要了。第二个参数`d`是本次任务的结果对象。第三个参数是前置任务的结果对象,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象。第四个参数是本次任务的方法体。} else {@SuppressWarnings("unchecked") T t = (T) r;d.result = d.encodeValue(f.apply(t)); // 本线程直接执行。t是上次任务的正常结果,d.result是本次任务的正常}} catch (Throwable ex) {d.result = encodeThrowable(ex); // 本次任务如果异常,那么本次任务结果是异常对象}return d;
}
static final class AltResult { // See above // AltResult 是异常包装类final Throwable ex;        // null only for NILAltResult(Throwable x) { this.ex = x; }
}
// `UniApply`对象的构造器方法。`src`是前置任务的结果,`dep`(方法中写作`d`)是本次任务的结果,`fn`是本次任务的方法体。
UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;
}

unipush()方法,不断自旋将任务压入栈。

final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {// 运行至此,入栈失败if (result != null) { // result != null表示前置任务出结果了NEXT.set(c, null); // 取消`tryPushStack()`方法的`NEXT.set(c, h)`break;}// 运行至此,入栈失败,且前置任务没出结果,则不断自旋}// 运行至此,要么入栈成功,要么前置任务出结果了if (result != null)// 运行至此,前置任务出结果,那么不入栈,直接执行本次方法体,这里执行的不是`run()`方法,而是`trynFire(SYNC)`方法。c.tryFire(SYNC); }
}
final boolean tryPushStack(Completion c) {Completion h = stack; // stack是前置任务的栈顶元素,即this.stackNEXT.set(c, h);         // CAS piggyback // 这句语义等于`c.next = h`,而`NEXT`是`VarHandle`对象,能线程安全并且高效地更改对象的值return STACK.compareAndSet(this, h, c);// 如果返回true,表示当前`CompletableFuture`对象的`stack`由h,线程安全地改成了c.// stack = h, 本次CAS自旋改成了 stack = c, c.next = h。
}

run()方法

abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {abstract CompletableFuture<?> tryFire(int mode);public final void run()                { tryFire(ASYNC); }
}
class CompletableFuture {static final int SYNC   =  0;static final int ASYNC  =  1;static final int NESTED = -1;
}

Completition类定义run()方法,内部执行的是子类覆写的tryFire(int mode)方法。try表示尝试执行,可能执行失败。intmode变量其实是个枚举量,只有3个值。

  1. SYNC,同步,如果没有得到结果,本方法调用不返回。
  2. ASYNC,异步,就算没有得到结果,本方法调用也返回。
  3. NESTED,内嵌,postComplete()方法独有的。

mode不同,tryFire(int mode)方法执行流程也不同。以UniApply类的tryFire(int mode)为例。

static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn; // 成员变量`fn`,本次任务的方法体// 构造器方法UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a; Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)// 运行至此,(a = src) == null为true,(d = dep) == null为true,(f = fn) == null)为true都表示已执行完`src = null; dep = null; fn = null;`语句,当前`UniApply`已被清空,任务已被完成// (r = a.result) == null为true表示前置任务还没完成,还不能执行当前操作return null; tryComplete: if (d.result == null) { // 当前任务结果为空if (r instanceof AltResult) { // 前置任务结果异常,则当前任务也异常if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);break tryComplete;// 跳出嵌套,下一行执行`src = null; dep = null; fn = null;`}r = null; // 及时清空变量,帮助GC}try {// mode <= 0 为true表示状态不是ASYNC,即非异步执行// !claim()为true表示该任务已经被执行过了// 因此返回if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;// 本线程直接方法体,t是前置任务的结果d.completeValue(f.apply(t));}} catch (Throwable ex) {d.completeThrowable(ex);}}src = null; dep = null; fn = null;// 执行至此,本次任务结束,将成员变量`src, dep, fn`都设为null,表示当前任务已完成。// 比如别的线程调用此对象的`isLive()`方法(),就返回dep对象是否为空。// final boolean isLive() { //     return dep != null; // }return d.postFire(a, mode);}
}
// UniCompletion<T,V>#claim(),UniCompletion是UniApply的父类
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// CAS原子性修改标志位,表示只有1个线程能够执行本`Completion`类// 运行至此,表示本`Completion`类第一次被执行if (e == null)return true; // 本线程直接执行,返回trueexecutor = null; // disable 成员变量executore.execute(this); // 交给线程池执行,跳出if语句,返回的是false}return false; 
}
// CompletableFuture#postFire()
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {Object r;if ((r = a.result) == null) // if语句为true表示当前对象未完成a.cleanStack(); // 清理a对象`stack`栈中空对象,即已经完成的对象if (mode >= 0 && (r != null || a.result != null)) // if语句为true表示当前a任务已完成,状态为`SYNC, ASNYC`而不是`NESTED`// a任务已经完成,现在要处理a对象的下游(依赖)对象a.postComplete();}if (result != null && stack != null) {// mode < 0 为true表示状态是`NESTED`,返回当前任务对象,`postComplete()`方法会用到if (mode < 0)return this;elsepostComplete(); // 当前任务已经完成,现在要处理当前任务的下游(依赖)对象}return null;
}

postComplete()

进入postComplete()方法的前提是当前任务已经有结果了,即this.result != null

final void postComplete() {/** On each step, variable f holds current dependents to pop* and run.  It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture<?> f = this; Completion h; // f是当前任务,h是栈顶元素while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}NEXT.compareAndSet(h, t, null); // try to detach}f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}

以图为例,执行base.postComplete()方法。
第一次进入while循环,h = thenComp1, f = base,执行f = (d = h.tryFire(NESTED)) == null ? this : d;f=comp1Result(原因是tryFire(NESTED)方法不会链式执行,只执行1个任务就返回结果)。
在这里插入图片描述
第二次进入while循环,f=comp1Result, h = comp1Thencomp1,由于f != this,执行pushStack(h)h离开comp1Result,进入basestack
在这里插入图片描述
continue表示会第三次进入while循环,将comp1Thencomp2也加入到basestack。可以看到,comp1Result的栈元素是倒序之后进入base的栈。
在这里插入图片描述
第四次循环h != nullfalseh重新赋值为当前basestack。开始执行comp1Thencomp2任务。


http://www.mrgr.cn/news/79467.html

相关文章:

  • 面试经验分享 | 杭州某安全大厂渗透测试岗
  • std::common_type和sfinae
  • SpringBoot【十】mybatis之xml映射文件>、<=等特殊符号写法!
  • 【基于OpenEuler国产操作系统大数据实验环境搭建】
  • Security自定义逻辑认证(极简案例)
  • PCL点云环境错误收集(不断新增)
  • shell自动显示当前git的branch
  • [机器学习] 监督学习之线性回归与逻辑回归
  • C++ 完美转发和左值右值
  • 利用高德地图API,如何在PHP与vue3中实现地图缩放功能
  • UE5.5 Geometry库平面切割原理分析
  • Vue3+Vite+ElementPlus 构建 笔记
  • 深入理解 Spring MVC 中的 @RequestBody 注解
  • AI职位对项目经验有哪些具体要求?
  • 【力扣】647.回文子串
  • 【论文相关】期刊/会议 信息检索——IEEE各期刊投稿要求(待完善)
  • leetcode399:除法求值
  • AGCRN论文解读
  • 【调试工具】USB 转 UART 适配器(USB 转 TTL)
  • 【数字电路与逻辑设计】实验五 4人表决器
  • Javascript Clipper library, v6(介绍目录)
  • 代码整洁之道学习
  • 「Mac玩转仓颉内测版44」小学奥数篇7 - 二元一次方程组求解
  • C#加速Bitmap存图
  • Linux网络编程之---组播和广播
  • 【数字电路与逻辑设计】实验一 序列检测器