Vert.x,Core - Future
Future,异步结果
在同步编程(synchronous programming)中,当调用一个方法,方法的调用者(线程)需要一直等待,直到方法结果返回。而在异步编程(asynchronous programming)中,通过异步方法,方法的调用者不需要等待结果返回,当结果就绪时候会通知我们。
In synchronous programming, when a function is called, the caller has to wait until the result is returned. One of the vert.x characteristics is being asynchronous. With an asynchronous API, you don’t wait for a result, but you are notified when this result is ready.
Vert.x的提供的API大都是异步的,调用并不需要等结果返回。我们可以通过设置处理器的方法来处理异步调用,例如前面用到的Vert.x API:
NetServer connectHandler(Handler<NetSocket> handler); // NetServer NetSocket handler(Handler<Buffer> handler); // NetSocket long setPeriodic(long delay, Handler<Long> handler) // Vertx
假设一个业务逻辑需要A() -> B() -> C() -> D() 这4个调用来完成,他们调用有向后顺序,后一个调用依赖前一个调用的结果。使用同步编程非常简单,按顺序调用就好了:
resultA = A();
resultB = B(resultA);
resultC = C(resultB);
resultD = D(resultC);
在异步编程中,这个需求会复杂些,因为顺序调用方法不能确保结果返回顺序,异步方法不会等待结果就绪就返回,调用线程将继续往下执行指令。我们可以通过下一个调用嵌套在前一个回调方法内的方式实现:
A(resultA ->{B(resultB ->{C(resultC ->{D(resultC);});});
});
上述异步调用的代码可读性是不是非常差?为此,Vert.x引入Future来承载异步结果,用来代表可能尚未准备好但将来可用的异步结果。Vert.x一些异步方法返回值就是Future,例如前面遇到的Vert.x异步API:
// listen()是异步方法,调用并不需要实际完成端口监听,监听完成后的结果(成功或失败)将放到返回的值Future对象中。
Future<NetServer> listen(); //NetServer
稍微归纳下,Vert.x中,通常使用为对象设置处理器方法来处理(可多次发生)事件,对于只执行一次的操作(如listen)的异步方法,通常返回一个Future对象。需要注意的是,这时两种处理异步结果的方法,不是不互斥的,根据需要选择使用的,也可以同时使用。
Vert.x中的Future对象用于获取异步结果,常用的方法包括:
- boolean isComplete() 判断结果是否就绪,如果结果就绪返回true;
- boolean succeeded() 判断执行是否正常结束,如果正常结束返回true;
- boolean failed() 判断执行是否异常结束(抛异常),如果执行异常返回true;
- T result() 返回正常执行的结果;
- Throwable cause() 返回异常执行抛出的异常;
- Future<T> onComplete(Handler<AsyncResult<T>> handler) 添加一个Handler来通知结果就绪;
- Future<T> onSuccess(Handler<T> handler) 添加一个Handler来处理执行成功的结果;
- Future<T> onFailure(Handler<Throwable> handler) 添加一个Handler来处理器失败;
Promise,异步结果写入
我们可以通过Future获取Vert.x异步调用的结果,那Vert.x是如何将结果放到Future的呢?答案是通过Promise。在Vert.x中, Promise用于在异步调用中写入异步结果。
Promise常用的方法包括:
- void complete() 用于设置异步调用已经结束;
- void complete(T result) 用于设置异步调用已经结束,并写入执行结果;
- void fail(Throwable cause) 用于设置异步调用失败,并写入异常;
- Future<T> future() 返回一个与promise关联的Futre对象,用于将来获取promise写入的值。
Future组合
compose方法作用于顺序组合future。compose方法传入一个函数式接口(JDK 8开始引入),对不常用的同学会有些难涩,稍微讲解以下语法:
Future<U> compose(Function<T, Future<U>> mapper) // Future.compose方法定义// Function是一个接口,有一个抽象方法apply:
public interface Function<T, R> {R apply(T t); // 抽象方法apply接收一个参数T,实现方法必须返回R类型的返回值。
}
对于compose方法的入参Function<T, Future<U>>
,假设我们有个值为String类型的Future, 需要通过compose组合另一个Future, 那么代码该如何写?
Future<String> future1 = Future.succeededFuture("Future1 Result."); // 异步调用1, 异步结果是String类型
// 编写一个匿名类, 实现Function接口,接口方法实现返回一个值为Integer的Future.
Function<String, Future<Integer>> function2 = new Function<>() {@Overridepublic Future<Integer> apply(String result) { // result传入的是future1的结果System.out.println("async call 1 result: " + result);// 在这里执行异步调用2, 并返回异步调用2的Futrue ...return Future.succeededFuture(result.length());}
};Future<Integer> future2 = future1.compose(function2);
// 最后,程序输出future2的结果,打印(future结果)字符串"Future1 Result."的长度
future2.onSuccess(result -> System.out.println(result));
我们知道,在Java中,如果一个接口只有一个抽象方法,是可以写成lambda表达式的:
Future<Integer> future2 = future1.compose(result -> {System.out.println("async call 1 result: " + result);return Future.succeededFuture(result.length());
});
通过compose方法,可以让异步调用按(组合顺序)顺序执行,而且可以将前一个调用的异步结果做为参数传递给下一个异步调用,如果其中一个异步调用失败(异常),则不会继续执行下去,最后一个Future通知执行异常。
除了compose方法, Future还提供了更多组合方法:map、recover、otherwise、andThen,以及flatMap。这里不做一一介绍。最后,举个含Future,Promise和compose的例子做为总结。
假设我们要启动一个http服务,需要:
1.先从数据库中需要的数据;
2. 根据数据库取出的数据来启动http服务器;
import java.util.logging.Level;
import java.util.logging.Logger;import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;public class Core2 {private static final Logger LOGGER = Logger.getLogger(Core1.class.getName());public static void main(String[] args) {Vertx vertx = Vertx.vertx();Future<String> dbAsyncResult = prepareDatabase(vertx, true);Future<StringBuilder> httpAsyncResult = prepareHttpServer(vertx, dbAsyncResult, false);httpAsyncResult.onComplete(asyncResult -> {if (asyncResult.succeeded()) {LOGGER.info(asyncResult.result().toString());} else {LOGGER.log(Level.SEVERE, "App Boot Failed!", asyncResult.cause());}vertx.close();});}public static Future<String> prepareDatabase(Vertx vertx, boolean fail) {Promise<String> promise = Promise.promise();vertx.setTimer(2000, event -> { // 假设在这里执行Vert.x数据库相关的异步API。if (fail) {promise.fail(new Exception("prepareDatabase failed.")); //如果执行抛异常,这写入异常。} else {LOGGER.info("prepareDatabase done.");promise.complete("Hello Service"); // 执行正常结束,写入结果。}});return promise.future(); // 返回promise的关联futrue对象。}public static Future<StringBuilder> prepareHttpServer(Vertx vertx, Future<String> databaseFuture, boolean fail) {return databaseFuture.compose(databaseResult -> { // composePromise<StringBuilder> promise = Promise.promise();vertx.setTimer(1000, event -> { // 假设在这里执行Vert.x中的http server相关的异步API。if (fail) {promise.fail(new Exception("prepareHttpServer failed."));} else {LOGGER.info("prepareHttpServer done.");StringBuilder buff = new StringBuilder();buff.append(databaseFuture.result()).append(" started.");promise.complete(buff);}});return promise.future();});}
}