CompletableFuture通过提供几十种有用的方法成功地弥补了这一差距,几乎所有的方法都是不阻塞和可组合的。我们已经习惯了 map()异步地转换输入的事件。此外,Observable.flatMap()允许我们用另一个Observable替换单个事件,链接异步任务。CompletableFutures中也有类似的操作符 .假设一个服务需要两个不相关的信息片段:用户和地理定位。了解了这两种情况,我们要求几家独立的旅行社去寻找航班,我们会在第一个返回的地方预订Ticket。这最后的要求尤其难以实现,Java 8之前要求使用ExecutorCompletionService有效地找到最快的任务:
User findById(long id) {
//...
}
GeoLocation locate() {
//...
}
Ticket book(Flight flight) {
//...
}
interface TravelAgency {
Flight search(User user, GeoLocation location);
}
使用方法:
ExecutorService pool = Executors.newFixedThreadPool(10);
List<TravelAgency> agencies = //...User user = findById(id);
GeoLocation location = locate();
ExecutorCompletionService<Flight> ecs =
new ExecutorCompletionService<>(pool);
agencies.forEach(agency ->
ecs.submit(() ->
agency.search(user, location)));
Future<Flight> firstFlight = ecs.poll(5, SECONDS);
Flight flight = firstFlight.get();
book(flight);
ExecutorCompletionService在Java开发人员中并不是特别受欢迎的,CompletableFuture不再需要了。但首先注意我们如何用 ExecutorCompletionService来包装ExecutorService,这样我们就可以在他们到达的时候为完成的任务投票。如果使用ExecutorService 的话,我们会最终获得一群Future对象,而且不知道哪一个先完成。因此ExecutorCompletionService 在这里就很有用,然而,我们仍然需要牺牲一个额外的线程来阻塞等待TravelAgencies的响应。此外,我们没有利用并发(同时加载用户和地理定位)。
我们的重构将把所有的方法都转换成它们的异步副本,然后适当地组合CompletableFutures 。这样我们的代码就完全不阻塞(主线程几乎立即完成),而且我们可以尽可能地并行化:
CompletableFuture<User> findByIdAsync(long id) {
return CompletableFuture.supplyAsync(() -> findById(id));
}
CompletableFuture<GeoLocation> locateAsync() {
return CompletableFuture.supplyAsync(this::locate);
}
CompletableFuture<Ticket> bookAsync(Flight flight) {
return CompletableFuture.supplyAsync(() -> book(flight));
}
@Override
public CompletableFuture<Flight> searchAsync(User user, GeoLocation location) {
return CompletableFuture.supplyAsync(() -> search(user, location));
}
我们用一个异步的CompletableFuture简单地包装了阻塞方法。supplyAsync方法接受一个可选的执行器作为参数。如果不指定的话,它将使用 ForkJoinPool.commonPool()中的全局定义的一个。建议总是使用自定义执行器,但是为了这个示例,我们利用了缺省的一个。请记住,缺省值是在所有CompletableFutures、并行流(参见第310页的“Java 8流和CompletableFuture”)和其他一些不太明显的地方共享的。
import static java.util.function.Function.identity;
List<TravelAgency> agencies = //...
CompletableFuture<User> user = findByIdAsync(id);
CompletableFuture<GeoLocation> location = locateAsync();
CompletableFuture<Ticket> ticketFuture = user
.thenCombine(location, (User us, GeoLocation loc) -> agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.reduce((f1, f2) ->
f1.applyToEither(f2, identity())
)
.get()
)
.thenCompose(identity())
.thenCompose(this::bookAsync);
在前面的代码示例中发生了相当多的事情。完整地解释 CompletableFuture超出了本书的范围,但是API的某些部分将在RxJava的环境中发挥作用。首先,我们异步地开始抓取User和GeoLocation。这两个操作是独立的,可以并发运行。但是,我们需要两者的结果来继续进行,当然不会阻塞和浪费客户机线程。这就是 thenCombine()方法——它接收两个CompletableFutures(分别是用户和位置的),并在完成时异步的调用回调函数。有趣的是,回调可以返回一个值,该值将成为结果CompletableFuture中的新内容,如下所示:
CompletableFuture<Long> timeFuture = //...
CompletableFuture<ZoneId> zoneFuture = //...
CompletableFuture<Instant> instantFuture = timeFuture
.thenApply(time -> Instant.ofEpochMilli(time));
CompletableFuture<ZonedDateTime> zdtFuture = instantFuture
.thenCombine(zoneFuture, (instant, zoneId) ->
ZonedDateTime.ofInstant(instant, zoneId));
CompletableFuture与Observable有很多相似之处。thenApply()执行对Future的动态转换,就像 Observable.map()一样。在我们的示例中,我们将 CompletableFuture<Long>,通过提供一个函数从Long到Instant( Instant::ofEpochMilli)来转换为 Completable Future<Instant>。稍后,我们将接收两个Futures(instantFuture和zoneFuture),并使用thenCombine()方法对它们的future的值进行转换,所转换的值即Instant和ZoneId。这个转换返回ZoneDateTime,但是因为大多数CompletableFuture操作符都是非阻塞,所以我们最终返回了一个CompletableFuture<ZonedDateTime>-------同样的,这非常类似于Observable的zip()操作符。
回到前面订票的例子,下面的代码片段可能很模糊:
List<TravelAgency> agencies = //...
agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.reduce((f1, f2) ->
f1.applyToEither(f2, identity())
)
.get()
我们需要通过调用searchAsync()来启动每个TravelAgency的异步操作。我们立即返回一个 List<CompletableFuture<Flight>>;这是一个 非常不方便的数据结构,如果我们所需要的只是第一个完成的Future的话,这里有一些像CompletableFuture.allOf()和CompletableFuture.anyOf()的方法 。从语义的角度来看,后者正是我们所需要的——它接收一组CompletableFuture ,并只返回第一个完成了的CompletableFuture,其他的会丢弃掉 。这与 Observable.amb()非常的相似。不幸的是,anyOf()的语法非常笨拙。首先,它接受一个数组(varargs),并且它总是返回CompletableFuture<Object>,而不是那些层的Futures(例如Flight)的类型。我们可以用它,但它变得很乱:
.thenCombine(location, (User us, GeoLocation loc) -> {
List<CompletableFuture<Flight>> fs = agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.collect(toList());
CompletableFuture[] fsArr = new CompletableFuture[fs.size()];
fs.toArray(futuresArr);
return CompletableFuture
.anyOf(futuresArr)
.thenApply(x -> ((Flight) x));
})
Stream.reduce()的技巧如下。这里存在一个 CompletableFuture.applyToEither()操作符,它接收两个CompletableFuture ,按只在第一个完成的CompletableFuture上面应用转换。当你有两个相同的任务时,你只关心第一个完成的任务,applyToEither() 转换是非常有用的。在下面的示例中,我们在两个不同的服务器上查询User:主服务器和次要服务器。无论哪一个首先完成,我们都应用一个简单的转换来提取用户的出生日期。第二个CompletableFuture 不会被中断,但是它的结果会被丢弃。很明显,我们最终会得到一个CompletableFuture<LocalDate>:
CompletableFuture<User> primaryFuture = //...
CompletableFuture<User> secondaryFuture = //...
CompletableFuture<LocalDate> ageFuture =
primaryFuture
.applyToEither(secondaryFuture,
user -> user.getBirth());
applyToEither()只能在两个CompletableFuture 上工作,而诡异的anyOf()可以取任意的数字。幸运的是,我们可以在前两个Futures中调用applyToEither(),然后获取结果(在前两个中最快的),并将它应用到上游的第三个Future(即在前3个中最快的)。通过迭代调用applyToEither(),我们得到了一个CompletableFuture,它代表了最快的任务。使用reduce()操作符可以有效地实现这个方便的技巧。最后一个警告是来自函数的identity()方法。这是applyToEither()的要求;我们必须提供一个处理第一个结果的转换。如果结果被假定为原样,我们可以使用一个恒等函数,也可以写成f -> f 或者(Flight f) -> f 。
最后,我们实现了CompletableFuture <Flight >,它是当最快的TravelAgency 响应了的以后异步完成的个任务。thenCombine()的结果还有一个小问题。不管传递给 thenCombine()的转换是什么,返回值都将被封装到一个CompletableFuture中。在我们的例子中,我们返回CompletableFuture <Flight >,所以thenCombine()结果的类型是:CompletableFuture<CompletableFuture<Flight>>。。双包装也是Observable的一个常见的问题,而且我们可以用同样的方法在两种情况下修复它:flatMap()!(参见67页的“WrappingUp Using flatMap()“)。但是要记住,在Future中,像map()一样的被称为 thenAp ply(),flatMap()被称为thenCompose() :
Observable<Observable<String>> badStream = //...
Observable<String> goodStream = badStream.flatMap(x -> x);
CompletableFuture<CompletableFuture<String>> badFuture = //...
CompletableFuture<String> goodFuture = badFuture.thenCompose(x -> x);
通常,我们使用flatMap() / thenCompose()来链接异步计算,但在这里,我们简单地打开(解包装,与包装wrap相对)不恰当的类型(我们可以看到,这里的badxx变量基本都是双层结构)。请记住,thenCompose()期望提供的转换的返回类型是CompletableFuture 。但是由于内部类型已经是一个Future,使用identity()函数,或者简单的 x -> x,通过打开内部的Future来修复类型。
最后,当我们有了CompletableFuture <Flight >(缩写为flightFuture)时,我们可以调用bookAsync(),它以一个Flight作为参数:
CompletableFuture<Ticket> ticketFuture = flightFuture
.thenCompose(flight -> bookAsync(flight));
这一次,在调用bookAsync()时更自然地使用了thenCompose()。该方法将返回CompletableFuture <Ticket >,因此为了避免双重包装,我们选择thenCompose()而不是thenApply()