当前位置: 首页 > news >正文

Java 异步编程——异步编排(CompletableFuture)

文章目录

  • 业务场景
  • CompletableFuture 使用
  • 线程串行化方法(转换操作)

CompletableFuture 是 JDK8 提出的一个支持非阻塞的多功能的 Future,提供了一种强大的异步编程模型,实现了 Future 接口;

在使用 Future 的过程中,大家会发现 Future 有两种方式返回结果的方式

  1. 阻塞方式get()
  2. 轮询方法isDone()

CompletableFuture 接口提供了非常多的方法用于编排异步任务,基本每个方法都有两套实现,Async 版本的函数与非 Async 版本的函数。

  • 若方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

业务场景

  • 查询商品详情页的逻辑比较复杂, 有些数据还需要远程调用, 必然需要花费更多的时间。

    // 1.获取sku的基本信息 0.5s
    // 2.获取sku的图片信息 0.5s
    // 3.获取sku的促销信息 1s
    // 4.获取spu的所有销售属性 1s
    // 5.获取规格参数组及组下的规格参数 1.5s
    // 6. spu详情 1s
    

    假如商品详情页的每个查询,需要按照标注的时间之和才能完成。即用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

CompletableFuture 使用

  • Future 是 Java 5 添加的类,用来描述一个异步计算的结果。

    虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背, 轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式,当计算结果完成及时通知监听者呢?

  • 在 Java 8 中,新增加了一个包含 50 个方法左右的类:CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过 get 方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用。

  • CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

常用方法

  1. 创建异步对象并执行(CompletableFuture)

    CompletableFuture 提供了多种方法来创建异步对象,其中 runAsync 和 supplyAsync 是最常用的方法。

    // 创建一个不返回结果的异步任务,使用默认线程池执行。
    public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
    }
    // 创建一个不返回结果的异步任务,使用指定的线程池执行。
    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
    }
    // 创建一个返回结果的异步任务,使用默认线程池执行。
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
    }
    // 创建一个返回结果的异步任务,使用指定的线程池执行。
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    
    • runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的。

    • 可以传入自定义的线程池,否则就用默认的线程池。

    示例:

    // 创建一个线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    // 使用 supplyAsync 创建一个返回结果的异步任务
    CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> {System.out.println("Calculating result...");try {Thread.sleep(1000); // 模拟计算} catch (InterruptedException e) {e.printStackTrace();}return 42; // 返回计算结果
    }, executor);
    // 确保主线程不会提前结束
    intFuture.join();
    // 关闭线程池
    executor.shutdown();
    
  2. thenAccept 方法,计算完成后再操作

    用于在异步任务完成后再执行某个操作,且该操作不返回计算结果。该方法只会在异步任务成功完成时被调用,如果任务失败,则不会执行。

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);
    }
    

    示例:

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行return 42; // 返回结果
    });// 使用 thenAccept 处理结果
    CompletableFuture<Void> thenAcceptFuture = future.thenAccept(result -> {System.out.println("The result is: " + result);
    });// 等待异步计算完成
    future.join(); // 确保主线程不会提前结束try {System.out.println("【thenAcceptFuture与future是否为同一个】:" + future.equals(thenAcceptFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 thenAccept 执行结果】:" + thenAcceptFuture.get());
    } catch (InterruptedException e) {throw new RuntimeException(e);
    } catch (ExecutionException e) {throw new RuntimeException(e);
    }
    // The result is: 42
    // 【thenAcceptFuture与future是否为同一个】:false
    // 【输出异步任务执行结果】:42
    // 【输出 thenAccept 执行结果】:null
    

    当只需要对计算结果进行处理,而不需要返回任何新值时,使用 thenAccept 是一个合适的选择。

  3. whenComplete 方法,计算完成时的回调方法(成功 whenComplete 或失败 exceptionally)

    这种回调无论是在任务成功完成或发生异常时都会被调用,用于处理结果或异常。

    whenComplete 方法返回一个新的 CompletableFuture,原始的 CompletableFuture 保持不变,但不返回新的计算结果,新计算结果与原始任务计算结果相同。whenComplete 只能感知异步返回的结果,不可以返回新的结果。

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);
    }public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(asyncPool, action);
    }public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {return uniWhenCompleteStage(screenExecutor(executor), action);
    }public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
    }
    
    • whenComplete 可以处理正常计算结果,exceptionally 处理异常情况。

    • whenComplete 和 whenCompleteAsync 的区别:

      • whenComplete:是执行当前任务的线程继续执行 whenComplete 的任务。

      • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

    • 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

    示例:

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行if (Math.random() > 0.5) {throw new RuntimeException("Something went wrong!");}return 42; // 返回结果
    });// 使用 whenComplete 方法注册一个回调,该回调接受两个参数:结果和异常。
    // 如果任务成功完成,result 将是计算的结果,exception 将为 null。
    // 如果任务失败,result 将为 null,而 exception 将包含抛出的异常。
    CompletableFuture<Integer> whenCompleteFuture = future.whenComplete((result, exception) -> {if (exception != null) {System.out.println("【whenComplete】Task failed with exception: " + exception.getMessage());} else {System.out.println("【whenComplete】Task completed successfully with result: " + result);}
    });// 使用 exceptionally 处理异常并提供默认值
    CompletableFuture<Integer> exceptionFuture = whenCompleteFuture.exceptionally(ex -> {System.out.println("【exceptionally】Handling exception: " + ex.getMessage());return 0; // 提供默认值
    });// 等待异步计算完成
    exceptionFuture.join();// 确保主线程不会提前结束
    try {System.out.println("【whenCompleteFuture与future是否为同一个】:" + future.equals(whenCompleteFuture));System.out.println("【exceptionFuture与future是否为同一个】:" + future.equals(exceptionFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 whenComplete 执行结果】:" + whenCompleteFuture.get());System.out.println("【输出 exceptionally 执行结果】:" + exceptionFuture.get());
    } catch (InterruptedException e) {throw new RuntimeException(e);
    } catch (ExecutionException e) {throw new RuntimeException(e);
    }// 【whenComplete】Task completed successfully with result: 42
    // 【whenCompleteFuture与future是否为同一个】:false
    // 【exceptionFuture与future是否为同一个】:false
    // 【输出异步任务执行结果】:42
    // 【输出 whenComplete 执行结果】:42
    // 【输出 exceptionally 执行结果】:42
    
  4. handle 方法

    handle 和 complete 一样,都是异步任务执行完后的处理。complete 只能感知异步返回的结果(只能得到返回的结果,不能对结果进行修改操作)。handle 即能感知异步返回的结果,并且可以返回新的结果。

    handle 方法返回一个新的 CompletableFuture,原始的 CompletableFuture 保持不变,但可以返回新的计算结果,新计算结果与原始任务计算结果可以不同。

    public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);
    }public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(asyncPool, fn);
    }public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);
    }
    

    示例:

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {// 模拟任务执行if (Math.random() > 0.5) {throw new RuntimeException("Something went wrong!");}return 42; // 返回结果
    });// 使用 handle 处理结果和异常
    CompletableFuture<Object> handledFuture = future.handle((result, exception) -> {if (exception != null) {System.out.println("Handling exception: " + exception.getMessage());return "Default Value"; // 返回默认值} else {System.out.println("Task completed successfully with result: " + result);return "Processed Result: " + (result * 2); // 修改返回结果// return result; }
    });// 等待异步计算完成并获取结果
    handledFuture.join(); // 确保主线程不会提前结束try {System.out.println("【handledFuture与future是否为同一个】:" + future.equals(handledFuture));System.out.println("【输出异步任务执行结果】:" + future.get());System.out.println("【输出 handle 执行结果】:" + handledFuture.get());
    } catch (InterruptedException e) {throw new RuntimeException(e);
    } catch (ExecutionException e) {throw new RuntimeException(e);
    }// Task completed successfully with result: 42
    // 【handledFuture与future是否为同一个】:false
    // 【输出异步任务执行结果】:42
    // 【输出 handle 执行结果】:Processed Result: 84
    

    使用 handle 的场景:

    • 当需要对计算结果进行修改或转换时。
    • 当需要在处理结果的同时处理异常,并可能返回一个不同类型的结果时。

    使用 whenComplete 的场景:

    • 当仅需要在任务完成后执行某些操作(如日志记录或资源清理),而不关心返回的结果时。
    • 当你希望在成功或失败时都执行某些操作,但不需要对结果进行修改时。

线程串行化方法(转换操作)

(和前端的 async、await 特像)

在这里插入图片描述

  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(能感知上一步异步任务的结果,执行接下来的异步任务并返回该任务的结果。)

  • thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。(能感知上一步异步任务的结果,执行接下来的异步任务不返回该任务的结果。)

  • thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作。(不能感知上一步异步任务的结果,执行接下来的异步任务。)

  • 带有 Async 默认是异步执行的。 同之前。

  • 以上都要前置任务成功完成。

  • 示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService future= Executors.newFixedThreadPool(10);CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;return i;}, future).thenApplyAsync(res -> {return res+3;}, future);Integer integer = integerCompletableFuture.get();//此处结果为8System.out.println(integer);
    }
    

http://www.mrgr.cn/news/55095.html

相关文章:

  • Pytorch常用函数汇总【持续更新】
  • 使用休眠的方式来解决电脑合盖后偶尔不能正常睡眠的问题
  • AI大模型是否有助于攻克重大疾病?
  • 使用Matplotlib绘制极轴散点图
  • 计算PSNR, SSIM, VAMF工具
  • PostgreSQL的学习心得和知识总结(一百五十五)|[performance]优化期间将 WHERE 子句中的 IN VALUES 替换为 ANY
  • 三周精通FastAPI:4 使用请求从客户端(例如浏览器)向 API 发送数据
  • SCTF-2024-wp
  • LabVIEW换流变换器智能巡检系统
  • 流量分类实验
  • JAVA基础【第三篇】
  • JavaScript报错:Uncaught SyntaxError: Unexpected end of input(at test.html:1:16)
  • 上市遭冷遇,AIGC难救七牛云
  • 【Linux 从基础到进阶】应用程序性能调优(Java、Python等)
  • 使用ROS一键部署LNMP环境
  • 测试测试测试07
  • 2024年10月20日
  • 给定一个正整数n随机生成n个字节即生成2n个十六进制数将其组成字符串返回secrets.token_hex(n)
  • 近似推断 - 引言篇
  • CollageController
  • 光致发光(Photoluminescence, PL)入门版
  • HTML DOM 简介
  • Manim 结构
  • Marin说PCB之GMSL2 的Layout走线的注意事项
  • HTML 区块
  • C++编程规范