Java 异步编程——异步编排(CompletableFuture)
文章目录
- 业务场景
- CompletableFuture 使用
- 线程串行化方法(转换操作)
CompletableFuture 是 JDK8 提出的一个支持非阻塞的多功能的 Future,提供了一种强大的异步编程模型,实现了 Future 接口;
在使用 Future 的过程中,大家会发现 Future 有两种方式返回结果的方式
- 阻塞方式get()
- 轮询方法isDone()
CompletableFuture 接口提供了非常多的方法用于编排异步任务,基本每个方法都有两套实现,Async 版本的函数与非 Async 版本的函数。
- 若方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
业务场景
-
查询商品详情页的逻辑比较复杂, 有些数据还需要远程调用, 必然需要花费更多的时间。
// 1.获取sku的基本信息 0.5s // 2.获取sku的图片信息 0.5s // 3.获取sku的促销信息 1s // 4.获取spu的所有销售属性 1s // 5.获取规格参数组及组下的规格参数 1.5s // 6. spu详情 1s
假如商品详情页的每个查询,需要按照标注的时间之和才能完成。即用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。
CompletableFuture 使用
-
Future 是 Java 5 添加的类,用来描述一个异步计算的结果。
虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背, 轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用
观察者设计模式
,当计算结果完成及时通知监听者呢? -
在 Java 8 中,新增加了一个包含 50 个方法左右的类:CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过 get 方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用。
-
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
常用方法:
-
创建异步对象并执行(CompletableFuture)
CompletableFuture 提供了多种方法来创建异步对象,其中 runAsync 和 supplyAsync 是最常用的方法。
// 创建一个不返回结果的异步任务,使用默认线程池执行。 public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable); } // 创建一个不返回结果的异步任务,使用指定的线程池执行。 public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable); } // 创建一个返回结果的异步任务,使用默认线程池执行。 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier); } // 创建一个返回结果的异步任务,使用指定的线程池执行。 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier); }
-
runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的。
-
可以传入自定义的线程池,否则就用默认的线程池。
示例:
// 创建一个线程池 ExecutorService executor = Executors.newFixedThreadPool(4); // 使用 supplyAsync 创建一个返回结果的异步任务 CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> {System.out.println("Calculating result...");try {Thread.sleep(1000); // 模拟计算} catch (InterruptedException e) {e.printStackTrace();}return 42; // 返回计算结果 }, executor); // 确保主线程不会提前结束 intFuture.join(); // 关闭线程池 executor.shutdown();
-
-
thenAccept 方法,计算完成后再操作
用于在异步任务完成后再执行某个操作,且该操作不返回计算结果。该方法只会在异步任务成功完成时被调用,如果任务失败,则不会执行。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action); }
示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行return 42; // 返回结果 });// 使用 thenAccept 处理结果 CompletableFuture<Void> thenAcceptFuture = future.thenAccept(result -> {System.out.println("The result is: " + result); });// 等待异步计算完成 future.join(); // 确保主线程不会提前结束try {System.out.println("【thenAcceptFuture与future是否为同一个】:" + future.equals(thenAcceptFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 thenAccept 执行结果】:" + thenAcceptFuture.get()); } catch (InterruptedException e) {throw new RuntimeException(e); } catch (ExecutionException e) {throw new RuntimeException(e); } // The result is: 42 // 【thenAcceptFuture与future是否为同一个】:false // 【输出异步任务执行结果】:42 // 【输出 thenAccept 执行结果】:null
当只需要对计算结果进行处理,而不需要返回任何新值时,使用 thenAccept 是一个合适的选择。
-
whenComplete 方法,计算完成时的回调方法(成功 whenComplete 或失败 exceptionally)
这种回调无论是在任务成功完成或发生异常时都会被调用,用于处理结果或异常。
whenComplete 方法返回一个新的 CompletableFuture,原始的 CompletableFuture 保持不变,但不返回新的计算结果,新计算结果与原始任务计算结果相同。whenComplete 只能感知异步返回的结果,不可以返回新的结果。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action); }public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(asyncPool, action); }public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {return uniWhenCompleteStage(screenExecutor(executor), action); }public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn); }
-
whenComplete 可以处理正常计算结果,exceptionally 处理异常情况。
-
whenComplete 和 whenCompleteAsync 的区别:
-
whenComplete:是执行当前任务的线程继续执行 whenComplete 的任务。
-
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
-
-
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行if (Math.random() > 0.5) {throw new RuntimeException("Something went wrong!");}return 42; // 返回结果 });// 使用 whenComplete 方法注册一个回调,该回调接受两个参数:结果和异常。 // 如果任务成功完成,result 将是计算的结果,exception 将为 null。 // 如果任务失败,result 将为 null,而 exception 将包含抛出的异常。 CompletableFuture<Integer> whenCompleteFuture = future.whenComplete((result, exception) -> {if (exception != null) {System.out.println("【whenComplete】Task failed with exception: " + exception.getMessage());} else {System.out.println("【whenComplete】Task completed successfully with result: " + result);} });// 使用 exceptionally 处理异常并提供默认值 CompletableFuture<Integer> exceptionFuture = whenCompleteFuture.exceptionally(ex -> {System.out.println("【exceptionally】Handling exception: " + ex.getMessage());return 0; // 提供默认值 });// 等待异步计算完成 exceptionFuture.join();// 确保主线程不会提前结束 try {System.out.println("【whenCompleteFuture与future是否为同一个】:" + future.equals(whenCompleteFuture));System.out.println("【exceptionFuture与future是否为同一个】:" + future.equals(exceptionFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 whenComplete 执行结果】:" + whenCompleteFuture.get());System.out.println("【输出 exceptionally 执行结果】:" + exceptionFuture.get()); } catch (InterruptedException e) {throw new RuntimeException(e); } catch (ExecutionException e) {throw new RuntimeException(e); }// 【whenComplete】Task completed successfully with result: 42 // 【whenCompleteFuture与future是否为同一个】:false // 【exceptionFuture与future是否为同一个】:false // 【输出异步任务执行结果】:42 // 【输出 whenComplete 执行结果】:42 // 【输出 exceptionally 执行结果】:42
-
-
handle 方法
handle 和 complete 一样,都是异步任务执行完后的处理。complete 只能感知异步返回的结果(只能得到返回的结果,不能对结果进行修改操作)。handle 即能感知异步返回的结果,并且可以返回新的结果。
handle 方法返回一个新的 CompletableFuture,原始的 CompletableFuture 保持不变,但可以返回新的计算结果,新计算结果与原始任务计算结果可以不同。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn); }public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(asyncPool, fn); }public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn); }
示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行if (Math.random() > 0.5) {throw new RuntimeException("Something went wrong!");}return 42; // 返回结果 });// 使用 handle 处理结果和异常 CompletableFuture<Object> handledFuture = future.handle((result, exception) -> {if (exception != null) {System.out.println("Handling exception: " + exception.getMessage());return "Default Value"; // 返回默认值} else {System.out.println("Task completed successfully with result: " + result);return "Processed Result: " + (result * 2); // 修改返回结果// return result; } });// 等待异步计算完成并获取结果 handledFuture.join(); // 确保主线程不会提前结束try {System.out.println("【handledFuture与future是否为同一个】:" + future.equals(handledFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 handle 执行结果】:" + handledFuture.get()); } catch (InterruptedException e) {throw new RuntimeException(e); } catch (ExecutionException e) {throw new RuntimeException(e); }// Task completed successfully with result: 42 // 【handledFuture与future是否为同一个】:false // 【输出异步任务执行结果】:42 // 【输出 handle 执行结果】:Processed Result: 84
使用 handle 的场景:
- 当需要对计算结果进行修改或转换时。
- 当需要在处理结果的同时处理异常,并可能返回一个不同类型的结果时。
使用 whenComplete 的场景:
- 当仅需要在任务完成后执行某些操作(如日志记录或资源清理),而不关心返回的结果时。
- 当你希望在成功或失败时都执行某些操作,但不需要对结果进行修改时。
线程串行化方法(转换操作)
(和前端的 async、await 特像)
-
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(能感知上一步异步任务的结果,执行接下来的异步任务并返回该任务的结果。)
-
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。(能感知上一步异步任务的结果,执行接下来的异步任务不返回该任务的结果。)
-
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作。(不能感知上一步异步任务的结果,执行接下来的异步任务。)
-
带有 Async 默认是异步执行的。 同之前。
-
以上都要前置任务成功完成。
-
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService future= Executors.newFixedThreadPool(10);CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;return i;}, future).thenApplyAsync(res -> {return res+3;}, future);Integer integer = integerCompletableFuture.get();//此处结果为8System.out.println(integer); }