Java8中CompletableFuture.allOf的使用
目录标题
- CompletableFuture.allOf(...);
- CompletableFuture.allOf(...).get();
- CompletableFuture.allOf(...).join();
- 总结
- 如何优雅的处理异常呢?
CompletableFuture.allOf(…);
CompletableFuture.allOf(…) 本身不会等待所有的 CompletableFuture 完成,它只是返回一个新的 CompletableFuture,这个新未来对象会在所有给定的未来对象完成时完成。
默认情况下,allOf 会等待所有的任务都完成,即使其中有一个失败了,也不会影响其他任务继续执行。
public class CompletableFutureExample {public static void main(String[] args) {// 创建一个任务列表List<CompletableFuture<String>> futureList = new ArrayList<>();long startTime = System.currentTimeMillis();// 添加一些异步任务for (int i = 0; i < 5; i++) {final int taskId = i;CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {//睡眠一秒Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}if (taskId == 2) {throw new RuntimeException("task=" + taskId + ",执行异常");}return "Result of Task " + taskId;});futureList.add(future);}long endTime1 = System.currentTimeMillis();System.out.println("[startTime,endTime1]=" + (endTime1 - startTime) + "ms");// 使用 allOf 方法来等待所有任务完成CompletableFuture<Void> allFutures =CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));long endTime2 = System.currentTimeMillis();System.out.println("[startTime,endTime2]=" + (endTime2 - startTime) + "ms");long endTime3 = System.currentTimeMillis();System.out.println("[startTime,endTime3]=" + (endTime3 - startTime) + "ms");// 打印每个任务的结果for (CompletableFuture<String> future : futureList) {try {// 注意:如果某个任务失败,这里会抛出异常String result = future.get();System.out.println(result);} catch (Exception e) {System.err.println("Task failed with exception: " + e.getCause());}}long endTime4 = System.currentTimeMillis();System.out.println("[startTime,endTime4]=" + (endTime4 - startTime) + "ms");}
}
执行结果:
CompletableFuture.allOf(…).get();
该代码在调用 get() 时会阻塞当前线程,直到所有的 CompletableFuture 完成。如果其中一个或多个任务失败,它会抛出 ExecutionException,需要捕获并处理异常。
public class CompletableFutureExample {public static void main(String[] args) {// 创建一个任务列表List<CompletableFuture<String>> futureList = new ArrayList<>();long startTime = System.currentTimeMillis();// 添加一些异步任务for (int i = 0; i < 5; i++) {final int taskId = i;CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {//睡眠一秒Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}if (taskId == 2) {throw new RuntimeException("task=" + taskId + ",执行异常");}return "Result of Task " + taskId;});futureList.add(future);}long endTime1 = System.currentTimeMillis();System.out.println("[startTime,endTime1]=" + (endTime1 - startTime) + "ms");// 使用 allOf 方法来等待所有任务完成CompletableFuture<Void> allFutures =CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));long endTime2 = System.currentTimeMillis();System.out.println("[startTime,endTime2]=" + (endTime2 - startTime) + "ms");try {allFutures.get();} catch (InterruptedException | ExecutionException e) {System.err.println("An error occurred: " + e.getMessage());}long endTime3 = System.currentTimeMillis();System.out.println("[startTime,endTime3]=" + (endTime3 - startTime) + "ms");// 打印每个任务的结果for (CompletableFuture<String> future : futureList) {try {// 注意:如果某个任务失败,这里会抛出异常String result = future.get();System.out.println(result);} catch (Exception e) {System.err.println("Task failed with exception: " + e.getCause());}}long endTime4 = System.currentTimeMillis();System.out.println("[startTime,endTime4]=" + (endTime4 - startTime) + "ms");}
}
执行结果:
CompletableFuture.allOf(…).join();
该代码同样会阻塞当前线程,直到所有的 CompletableFuture 完成。不同于 get(),join() 方法在任务失败时会抛出一个未检查的异常(CompletionException),而不需要处理检查型异常。
public class CompletableFutureExample {public static void main(String[] args) {// 创建一个任务列表List<CompletableFuture<String>> futureList = new ArrayList<>();long startTime = System.currentTimeMillis();// 添加一些异步任务for (int i = 0; i < 5; i++) {final int taskId = i;CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {//睡眠一秒Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}if (taskId == 2) {throw new RuntimeException("task=" + taskId + ",执行异常");}return "Result of Task " + taskId;});futureList.add(future);}long endTime1 = System.currentTimeMillis();System.out.println("[startTime,endTime1]=" + (endTime1 - startTime) + "ms");// 使用 allOf 方法来等待所有任务完成CompletableFuture<Void> allFutures =CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));long endTime2 = System.currentTimeMillis();System.out.println("[startTime,endTime2]=" + (endTime2 - startTime) + "ms");allFutures.join();long endTime3 = System.currentTimeMillis();System.out.println("[startTime,endTime3]=" + (endTime3 - startTime) + "ms");// 打印每个任务的结果for (CompletableFuture<String> future : futureList) {try {// 注意:如果某个任务失败,这里会抛出异常String result = future.get();System.out.println(result);} catch (Exception e) {System.err.println("Task failed with exception: " + e.getCause());}}long endTime4 = System.currentTimeMillis();System.out.println("[startTime,endTime4]=" + (endTime4 - startTime) + "ms");}
}
执行结果:
不用强制捕获异常,但是如果有异常会直接抛出!
allFutures.join();
换成
try {allFutures.join();
} catch (Exception e) {System.err.println("An error occurred: " + e.getMessage());
}
执行结果:
总结
仅仅调用 CompletableFuture.allOf(…) 不会导致等待,它只构造一个新的 CompletableFuture。只有调用 get() 或 join() 才会真正等待所有线程执行完毕。get() 需要处理检查型异常,而 join() 则更简洁,不需要处理检查型异常,适合于不需要具体处理的场景。
如何优雅的处理异常呢?
如果异常的情况不需要处理,比如给默认值这种情况,那么下面的写法优雅一些。
public class CompletableFutureExample {public static void main(String[] args) {// 创建一个任务列表List<CompletableFuture<String>> futureList = new ArrayList<>();long startTime = System.currentTimeMillis();// 添加一些异步任务for (int i = 0; i < 5; i++) {final int taskId = i;CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {//睡眠一秒Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}if (taskId == 2) {throw new RuntimeException("task=" + taskId + ",执行异常");}return "Result of Task " + taskId;}).handle((result, ex) -> {if (ex != null) {// 处理异常并返回 nullSystem.err.println("Task failed: " + ex.getCause());return null; // 返回 null 表示任务失败}return result; // 返回结果});futureList.add(future);}long endTime1 = System.currentTimeMillis();System.out.println("[startTime,endTime1]=" + (endTime1 - startTime) + "ms");// 使用 allOf 方法来等待所有任务完成CompletableFuture<Void> allFutures =CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));// 处理结果和异常CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v ->futureList.stream().map(CompletableFuture::join) // 使用 join() 获取结果.filter(result -> result != null) // 过滤掉失败的结果.collect(Collectors.toList()) // 收集成功的结果);long endTime2 = System.currentTimeMillis();System.out.println("[startTime,endTime2]=" + (endTime2 - startTime) + "ms");// 打印每个任务的结果List<String> futureResultList = resultsFuture.join();long endTime3 = System.currentTimeMillis();System.out.println("[startTime,endTime3]=" + (endTime3 - startTime) + "ms");for (String result : futureResultList) {System.out.println(result);}long endTime4 = System.currentTimeMillis();System.out.println("[startTime,endTime4]=" + (endTime4 - startTime) + "ms");}
}
输出结果: