工厂方法Observable.from(Future<T>)返回早已存在的Observable<T>。然而,由于旧的Future<T>API的限制,它有几个缺点,最大的一个是在Future.get()在内部会阻塞。经典的Future<T>实现没有办法注册回调并异步处理它们,因此它们在响应式应用程序中非常无用。

相比之下,CompletableFuture 则完全不同。从语义上说,你可以像Observable 一样对待CompletableFuture ,它具有以下特征:

  • It is hot

不管是否有人注册了像thenApply()这样的回调函数,在CompletableFuture后面的计算都是非常迫切的。

  • It is cached.

在CompletableFuture后面的后台计算被急切地触发,结果被转发到所有已注册的回调。此外,如果回调在结束/完成后注册,则会立即用已结束/完成的值(或异常)调用它.

  • It emits exactly one element or exception.

原则上,Future<T>只会结束/完成一次,返回一个T类型的值或者是一个异常。这与Observable契约相符

Turning CompletableFuture into Observable with single item

首先,我们想要编写一个实用函数,它接收一个CompletableFuture<T>,并返回一个正确的Observable<T>。、

import rx.Observable;

import java.util.concurrent.CompletableFuture;

public class Util {

    static <T> Observable<T> observe(CompletableFuture<T> future) {
        return Observable.create(subscriber -> {
            future.whenComplete((value, exception) -> {
                if (exception != null) {
                    subscriber.onError(exception);
                } else {
                    subscriber.onNext(value);
                    subscriber.onCompleted();
                }
            });
        });
    }

}

为了对成功或者失败的结束/完成(即Completion)得到通知,我们使用CompletableFuture.whenComplete()方法。它接收两个参数,不包括其他的。如果异常不是null,说明潜在的Future 失败了。否则,我们就会获得成功的值。在这两种情况下,我们通知传入的订阅者。注意,如果在CompletableFuture完成后(其中一种或另一种方式)完成了订阅,回调将立即执行。CompletableFuture在完成后立即缓存结果,以便立即调用随后注册的回调。

很容易为取消订阅注册一个处理器(handler),它会在取消订阅时候后去撤销CompletableFuture :

//Don't do this!
subscriber.add(Subscriptions.create(
    () -> future.cancel(true)));

这是个坏主意。我们可以基于一个CompletableFuture 创建许多Observables,每个Observable 都可以有多个Subscribers。如果只有一个Subscriber 可以决定在Future 完成之前取消订阅,那么取消将影响到所有其他Subscribers。

记住,CompletableFuture是Hot类型的,并且使用Rx术语进行缓存。它立即开始计算,而Observable则直到有人真正订阅之后才开始计算。考虑到这一点,我们可以进一步改进API:

    Observable<User> rxFindById(long id) {
        return Util.observe(findByIdAsync(id));
    }
    Observable<GeoLocation> rxLocate() {
        return Util.observe(locateAsync());
    }
    Observable<Ticket> rxBook(Flight flight) {
        return Util.observe(bookAsync(flight));
    }

显然,如果您正在使用的API支持从一开始就可以使用Observable(作者的意思应该知识表达从API的构建开始,就可以用RxJava来做),那么您不需要所有这些额外的适配器层。然而,如果你所拥有的一切都是CompletableFuture ,将它们转换为Observables是高效和安全的。RxJava的优点是我们的初始问题的更简明地实现:

Observable<TravelAgency> agencies = agencies();
Observable<User> user = rxFindById(id);
Observable<GeoLocation> location = rxLocate();
Observable<Ticket> ticket = user
    .zipWith(location, (us, loc) ->
        agencies
            .flatMap(agency -> agency.rxSearch(us, loc))
            .first()
    )
    .flatMap(x -> x)
    .flatMap(this::rxBook);

使用RxJava API的客户端代码看起来不那么吵,也更容易阅读。Rx以流的形式自然支持“futures with multiple values”。如果您仍然觉得在flatMap()中的恒等式转换x ->x()有点吓人,我们可以使用Pair这个助手容器来分割zipWith():

import org.apache.commons.lang3.tuple.Pair;
//...
Observable<Ticket> ticket = user
    .zipWith(location, (usr, loc) -> Pair.of(usr, loc))
    .flatMap(pair -> agencies
        .flatMap(agency -> {
            User usr = pair.getLeft();
            GeoLocation loc = pair.getRight();
            return agency.rxSearch(usr, loc);
        }))
    .first()
    .flatMap(this::rxBook);

此时,您应该理解为什么不再需要额外的x -> x了。zipWith()接收两个独立的Observables,同时异步地等待它们。Java没有内置的Pair和元组,因此我们必须提供一个转换,它将从两个流中获取事件,并将它们组合成一个Observable<Pair<User, Location>>对象。该对象将是下游Observable的输入。之后,我们使用flatMap(),根据给定的User和Location来并发的搜索每个旅行社。flatMap()为我们进行拆包(unwrapping)(从语法角度看),因此产生的流是一个简单的Observable<Flight>。很自然地,在这两种情况下,我们都使用 first()只处理上游发生的第一个Flight 。(即最快的TravelAgency )

From Observable to CompletableFuture

在某些情况下,您所使用的API可能支持CompletableFuture,而不是RxJava。这种情况非常常见,特别是考虑到前者是JDK的一部分,而后者只是一个库。在这种情况下,将Observable转换为完全的未来是很好的。实现这个转换有两种方法:

  • Observable<T> to CompletableFuture<T>

当您期望仅从流中发出单个条目时使用此方法,例如,当Rx包装一个方法调用或请求/响应模式时。当流以释放一个值而完成时候,CompletableFuture<T> 也会成功地完成。显然,当流以异常结束或者是发射了多个值的话,那么future也会以异常完成。

  • Observable<T> to CompletableFuture<List<T>>

在这个场景中,当所有从上游Observable事件都被释放,并且流完成时,这个CompletableFuture就完成了。这只是第一个转换的特殊情况,稍后您将看到。

您可以使用以下实用程序轻松实现第一个场景:

static <T> CompletableFuture<T> toFuture(Observable<T> observable) {
    CompletableFuture<T> promise = new CompletableFuture<>();
    observable
        .single()
        .subscribe(
            promise::complete,
            promise::completeExceptionally
        );
    return promise;
}

在深入实施之前,请记住,这种转换有一个重要的副作用:它订阅Observable,从而强制评估和计算Cold类型的Observables。而且,这个转换器的每次调用都将重新订阅;这只是一个你必须意识到的设计选择。

除此之外,实现是相当有趣的。首先,我们可以使用single()来强制Observable只发出一个元素,否则就会抛出异常。如果这单个事件被发射了,并且流也完成后,我们调用CompletableFuture.complete()。事实证明,不需要任何后台线程池和异步任务,就可以从头创建一个CompletableFuture。它仍然是一个CompletableFuture,但是完成它的唯一方法 和 用信号通知所有已注册的回调都是通过显式的调用complete()。这是异步交换数据的一种有效方式,至少是在RxJava不可用时候是这样的。

在失败的情况下,我们可以在所有注册的回调函数上通过调用CompletableFuture.completeExceptionally()来触发一个Error。令人惊讶的是,这是整个实现。从toFuture返回的Future ,表现的就好像它在后台绑定了一些任务,而在现实中我们明确地完成了它。

从 Observable<T>到 CompletableFuture<List<T>>的转换非常简单:

static <T> CompletableFuture<List<T>> toFutureList(Observable<T> observable) {
    return toFuture(observable.toList());
}

CompletableFuture和Observable之间的互操作性非常有用。前者设计合理,但缺乏后者的表现力和丰富性。因此,如果您被迫在一个基于Rxjava的应用程序中处理CompletableFuture ,那么尽快应用这些简单的转换,以提供一个一致且可预测的API。确保您理解了eager(Hot类型)的Future和默认是惰性的的Observable之间的区别。

results matching ""

    No results matching ""