empty()、never()和error()工厂看起来并不是非常有用;然而,当与真正的Observables的组合时候,它们非常方便。有趣的是,尽管RxJava都是关于事件流的异步处理,但默认情况下,前面提到的工厂方法是在客户机线程上运行的。请看下面的代码示例:

private static void log(Object msg) {
    System.out.println(
        Thread.currentThread().getName() +
        ": " + msg);
}
//...
log("Before");
Observable
    .range(5, 3)
    .subscribe(i -> {
        log(i);
    });
log("After");

我们感兴趣的是执行每个日志语句的线程:

main: Before
main: 5
main: 6
main: 7
main: After

print声明的顺序也很重要。 Before 和After消息在客户端主线程中被打印,这并不意外。但是,请注意,订阅也发生在客户端主线程中,并且在接收到所有事件之前,subscribe()实际上阻塞了客户端线程。除非某些操作符需要,RxJava不会隐式地在任何线程池中运行您的代码。为了更好地理解这种行为,我们来研究一下用于制造Observable的低级操作符:create():

Observable<Integer> ints = Observable
    .create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            log("Create");
            subscriber.onNext(5);
            subscriber.onNext(6);
            subscriber.onNext(7);
            subscriber.onCompleted();
            log("Completed");
        }
});
log("Starting");
ints.subscribe(i -> log("Element: " + i));
log("Exit");

前面的代码示例故意冗长。这里是输出,包括执行每一行的线程名称:

main: Starting
main: Create
main: Element: 5
main: Element: 6
main: Element: 7
main: Completed
main: Exit

为了了解Observable.create()如何工作,以及RxJava如何处理并发,我们将逐步分析执行步骤。首先,我们创建ints这个Observable<Integer>,提供给create()方法的参数是Observable.OnSubscribe回调接口(注意,由于本书使用的version是1.1.6,所以如果你使用了最新的1.3.4version或者更新的版本的话,会发现该方法已经被标识为过时方法,但是仍然可以在Observable接口中找到OnSubscribe接口的声明)的实现(稍之后,我们几乎总是用一个简单的lambda表达式替换它)。在这一点上, 除了创造一个 Observable 实例之外,什么都没有发生;因此,我们看到的第一行输出是"main:Starting"。默认情况下, Observable推迟发射事件,也就是说,在你真正订阅之前,它不会开始发射任何东西,因此,赋予create()的lambda表达式还没有被执行。之后,我们执行订阅,即ints.subscribe(...),强制Observable开始发射条目。这对于所谓的Cold类型的流来说是正确的。另一方面,Hot类型的流即使没有人订阅,也会发出事件。这一重要的区别将很快在第43页的“Hot and Cold Observables”中解释。

Observable.create()是万能的,事实上你可以在该方法上模仿之前发现的所有工厂方法。例如,Observable.just(x),发出单个值x,然后立即完成,可能是这样的:

static <T> Observable<T> just(T x) {
    return Observable.create(subscriber -> {
        subscriber.onNext(x);
        subscriber.onCompleted();
        }
    );
}

作为一个练习,尝试只用create()来实现never()、empty(),甚至range()工厂方法。

Managing multiple subscribers(管理多个订阅者)

消息发射不会开始,直到我们实际执行subscribe()订阅才开始。但是每次调用subscribe()时,都会执行create()中的订阅处理程序。这既不是优点也不是缺点,它只是你必须记住的东西。在某些情况下,每个订阅者都有自己指定的处理程序来进行调用,这样的效果非常好。例如,Observable.just(42),它应该向每个订阅者发送42,而不仅仅是第一个订阅者。另一方面,如果在create()中放入数据库查询或重量级计算,那么在所有订阅者中共享单个调用可能是有益的。

为了确保您真正理解订阅是如何工作的,请考虑以下代码示例,它们订阅了相同的 Observable两次:

Observable<Integer> ints =
    Observable.create(subscriber -> {
        log("Create");
        subscriber.onNext(42);
        subscriber.onCompleted();
        }
    );
log("Starting");
ints.subscribe(i -> log("Element A: " + i));
ints.subscribe(i -> log("Element B: " + i));
log("Exit");

你想要什么样的输出?请记住,每次您通过工厂方法,订阅到通过create()工厂方法创建的Observable时,lambda表达式作为一个参数传递给create(),默认在在发起订阅的线程内执行:

main: Starting
main: Create
main: Element A: 42
main: Create
main: Element B: 42
main: Exit

如果您想避免为每个订阅者调用create(),并且简单地重用已经计算过的事件,这里存在一个 cache()操作符:

Observable<Integer> ints =
    Observable<Integer>.create(subscriber -> {
        //...
        }
    )
.cache();

cache()是您学习的第一个操作符。操作符包装现有的Observable ,通常通过拦截订阅,来优化增强它们。cache()所做的是站在subscribe()和我们的自定义Observable 之间。当第一个订阅者出现时,cache()将订阅委托给底层的Observable ,并转发所有通知(事件、Completion或Error)到下游。但是,同时,它在内部保留所有通知的副本。当一个后续的订阅者想要接收推送通知,cache()不再委托给底层的Observable,而是提供缓存的值。通过缓存,两个订阅者的输出是完全不同的:

main: Starting
main: Create
main: Element A: 42
main: Element B: 42
main: Exit

当然,您必须记住,cahce()和无限流是灾难的配方,也称为OutOfMemoryError。但这将在315页的“Memory Consumption and Leaks(内存消耗和泄漏)”中进行讨论。

results matching ""

    No results matching ""