有一些第三方Api是阻塞的,我们对此无能为力。我们可能没有源代码,重写可能会带来太多的风险。在这种情况下,我们必须学习如何处理阻塞代码而不是与之对抗。
RxJava的一个特点是声明式并发,而不是命令式并发。手工创建和管理线程是我们大多数人过去(与331页的“Thread Pool of Connections”比较)已经使用的线程池托管方法(比如,使用ExecutorService)。但是RxJava更进一步:Observable可以是非阻塞的,就像Java 8中的CompletableFuture(参见第193页上的“CompletableFuture和Streams”),但与其他的不同的是,它也是懒惰的。除非你订阅它,否则一个行为良好的Observable 将不会执行任何操作。但是,Observable 的力量甚至超越了这一点。
异步的Observable是指一个从不同的线程调用您的订阅者的回调方法(如onNext())的Observable。回想一下,在第35页中的“Mastering Observable.create()”,我们研究了阻塞的subscribe(),它会等待直到所有通知到达?在现实生活中,大多数Observable 来自于从本质上来说是异步的数据源。第5章完全致力于这样的Observable,但即使是我们136页“Replacing Callbacks with Streams”研究的的简单JMS示例,它使用了从JMS规范(MessageListener接口)中内置的非阻塞API。这不是由类型系统强制执行或建议的,但是许多Observables从一开始就是异步的,您应该假设它是异步的。当Observable.create()中的,一个阻塞式的subscribe()方法实际上很少发生,即在observable . create()中的lambda不支持任何异步的过程和流。但是,默认情况下(create())所有的事情都发生在客户端线程中(即指定订阅的线程)。如果您只是在create()回调中直接戳onNext(),则不会涉及多线程和并发。
遇到这样一种与众不同的Observable,我们可以声明式的选择将用于发出值的所谓的Scheduler。如果是CompletableFuture,我们无法控制底层的线程,API自己做出了这种决定,即使在最坏的情况下也不可能重写它。RxJava很少单独做这样的决定,并选择安全的缺省:客户端线程和不涉及多线程。为了本章的目的,我们将使用一个非常简单的日志“库”,它将在程序开始使用system . currenttimemillis()来打印一条消息,以及当前线程和毫秒数。
void log(Object label) {
System.out.println(
System.currentTimeMillis() - start + "\t| " +
Thread.currentThread().getName() + "\t| " +
label);
}