如果您将RxJava与现有的、阻塞的和命令式的代码相结合,可能需要将Observable转换为普通的集合。这种转换相当不愉快,它需要阻塞一个Observable ,等待知道它完成为止(即收到终端事件)。在Observable 完成之前,我们不能创建集合。BlockingObservable是一种特殊的类型,它使得可以很容易的在非响应式的环境下与Observable交互工作。在使用RxJava时,BlockingObservable可能是你最后的选择了,但这也是在组合阻塞和非阻塞代码时是不可避免的。

在第3章中,我们重构了listPeople()方法,以便它返回Observable<People>而不是List。Observable在任何意义上都不是一个Interator,所以我们的代码不会编译通过。我们想要迈出婴儿的步伐而不是大规模的重构,所以让我们尽可能地保持最小的变化范围。客户端代码如下所示:

List<Person> people = pesonDao.listPeople();
String json = marshal(people);

我们可以想象的到,marshal()方法从people 这个集合中拉取数据并将其序列化为JSON。但是现在已经不是这样了,我们不能简单地在我们想要的时候从Observable中拉取条目。Observable 负责生产(推送)条目,并通知订阅者。这种彻底的改变可以很容易地通过BlockingObservable来规避掉。这个方便的类完全独立于Observable ,并且可以通过Observable.toBlocking()方法获得。Observable 的阻塞变体有表面上类似的方法,比如single()或subscribe()等方法。但是,但是,在阻塞环境中,BlockingObservable更方便,因为阻塞环境天生对Observable的异步特性没有准备的。BlockingObservable 的操作符通常会阻塞(等待),直到完成了底层的Observable 。这与Observable 的主要概念有强烈的矛盾,即所有事物都可能是异步的、惰性的,并动态的进行处理。例如,Observable.forEach()将以异步方式接收从Observable进来的事件,而BlockingObservable.forEach()将阻塞,直到所有事件被处理完,并且流也完成。当然,异常也不会作为值(事件)继续的传播,而是在调用线程中重新抛出。

在我们的案例中,我们希望将Observable<Person >转换回List <Person >来限制重构的范围:

Observable<Person> peopleStream = personDao.listPeople();
Observable<List<Person>> peopleList = peopleStream.toList();
BlockingObservable<List<Person>> peopleBlocking = peopleList.toBlocking();
List<Person> people = peopleBlocking.single();

为了解释发生了什么,我特意将所有中间类型都保留了出来。在重构到Rx之后,我们的API返回Observable<Person> peopleStream。该流可以完全是响应式、异步性和事件驱动的,但是这与我们所需要的完全不匹配:静态List。作为第一步,我们将可Observable<Person>变为 Observable<List<Person>>。这个惰性的操作符将缓冲所有Person的事件,并将它们保存在内存中,直到接收到onCompleted()事件。此时,将发出一个类型为List <Person >的单个事件,同时包含所有已看到的事件,如下图所示::

生成的流在发射单个列表条目后立即完成。还有,这个操作符是异步的;它不会等待所有事件的到来,而是惰性地缓冲所有的值。看起来很尴尬的Observable<List<Person>> peopleList继而被转换为 BlockingObservable<List<Person>> peopleBlocking。只有当您必须提供对其他异步Observable的阻塞、静态视图时,BlockingObservable无疑是一个好主意。相应的,Observable.from(List<T>)将原本基于拉的和转换为一个Observable ,toBlocking() 做的则恰恰相反。您可能会问自己,为什么我们需要关于阻塞和非阻塞操作符的两个抽象。RxJava的作者指出,明确了底层操作符的同步和异步性质是非常重要的,不能仅仅将这方面的介绍留给JavaDoc。有两个不相关的类型可以确保您总是使用适当的数据结构。此外,BlockingObservable 是你最后的武器;正常情况下,你应该尽可能长的组合和链接Observables。然而,为了这个练习的目的,让我们马上逃离Observable 。最后一个操作符single()会完全删除observables对象,并只提取一个,并且只有一个,即我们期望从BlockingObservable<T>接收到的条目。一个类似的运算符,first(),将返回一个T类型的值,并丢弃它所剩下的任何值。另一方面,single()确保在接收到终端事件之前,不会有更多的挂起事件。这意味着single()将阻塞等待onCompleted()回调。下面是与之前相同的代码片段,这一次,所有的操作符都被链接起来:

List<Person> people = personDao
    .listPeople()
    .toList()
    .toBlocking()
    .single();

你可能会认为,我们经历了这么多麻烦的Observable包装和拆开,却没有明显的原因。记住,这只是第一步。下一个变型会带来一些惰性。我们现在的代码总是执行uery(“…”),并将其包装为Observable的。正如你现在所知道的,Observables(特别是Cold类型的)在定义上是惰性的。只要没有人订阅,他们就只是代表了一个从未有机会开始发射值的流。大多数情况下,您可以调用返回Observable 的方法,只要您不订阅,它就不会运行。Observable特别像Future,因为它承诺在未来会出现一个值。但是只要你不请求它,一个Cold类型的Observable就不会开始发射事件。从这个角度来看,Observable与java.util.function.Supplier<T>特别相似,根据需要生成类型T的值。Hot类型的Observable 则不同了,因为它不管你是否在监听(即订阅),它都会发射值,但我们现在没有考虑它们。Observable的存在并不是代表一个后台job或者任何副作用,它不像Future,几乎总是建议并行的执行某些操作。

results matching ""

    No results matching ""