关于subscribeOn()的工作方式有几个细微的差别。首先,好奇的读者应该想知道,如果两个subscribeOn()的调用出现在Observable 和subscribe()之间会发生什么。答案很简单:最接近原始的Observable 的subscribeOn()胜利。这具有重要的实际意义。如果您正在设计一个API,并且在内部使用subscribeOn(),那么客户端代码将无法覆盖您选择的Scheduler 。这可能是一个有意识的设计决策;毕竟,API设计者可能知道哪个Scheduler是合适的。另一方面,为我刚所讨论的API提供一个重载版本,使其可以覆盖所选的Scheduler,这通常是一个好主意。
让我们来研究一下subscribeOn()的行为:
log("Starting");
Observable<String> obs = simple();
log("Created");
obs
.subscribeOn(schedulerA)
//many other operators
.subscribeOn(schedulerB)
.subscribe(
x -> log("Got " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
log("Exiting");
下面的输出只暴露了schedulerA相关的日志:
17 | main | Starting
73 | main | Created
83 | main | Exiting
84 | Sched-A-0 | Subscribed
84 | Sched-A-0 | Got A
84 | Sched-A-0 | Got B
84 | Sched-A-0 | Completed
有趣的是,为了支持schedulerA,对schedulerB的订阅并不是完全被忽视的。 schedulerB仍然被使用了很短的时间,但是它几乎没有调度任何新的操作,而所有的操作都是由schedulerA完成了。因此,多个subscribeOn()不仅会被忽略,而且还引入了少量的开销。
说到操作符,我们说过,当有新的Subscriber(订阅者)要在提供的调度器中执行时,才会使用create()方法。但是,哪个线程执行create()和subscribe()之间发生的所有这些转换呢?我们已经知道,当所有操作符在同一线程中默认执行时,默认情况下不涉及并发性:
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(this::log)
.map(x -> x + '1')
.doOnNext(this::log)
.map(x -> x + '2')
.subscribeOn(schedulerA)
.doOnNext(this::log)
.subscribe(
x -> log("Got " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
log("Exiting");
我们偶尔使用doOnNext()向操作人员传递管道,以查看在此点控制的线程信息。请记住,subscribeOn()的位置是不相关的,它可以是在 Observable后立即调用,也可以是在subscribe()之前。下面的输出并不令人吃惊:
20 | main | Starting
104 | main | Created
123 | main | Exiting
124 | Sched-A-0 | Subscribed
124 | Sched-A-0 | A
124 | Sched-A-0 | A1
124 | Sched-A-0 | A12
124 | Sched-A-0 | Got A12
124 | Sched-A-0 | B
124 | Sched-A-0 | B1
124 | Sched-A-0 | B12
125 | Sched-A-0 | Got B12
注意如何调用create()并生成A和B事件。这些事件依次通过调度器的线程到达Subscriber。许多RxJava的新人认为,使用大量线程的Scheduler 将自动地对事件进行fork处理,并最终将所有结果join在一起。事实并非如此。RxJava为整个管道创建了一个单一的Worker实例(参见第146页的“Scheduler implementation details overview”),主要是为了保证事件的顺序处理。
这意味着,如果其中一个操作符的速度特别慢------例如,map()从磁盘读取数据,以转换经过的事件-----这个代价高昂的操作将在同一个线程中调用。一个坏掉的操作符可以减慢整个管道,从生产到消费。这是RxJava中的反模式,操作符应该是非阻塞、快速和尽可能纯的。
再一次,flatMap()来救援了。与其在map()中阻塞,我们可以调用flatMap()并异步收集所有结果。因此,当我们想要实现真正的并行性时,通常是使用flatMap()和merge()操作符。但即使有了flatMap(),它也不明显。假设有一个杂货店(我们称之为“RxGroceries”),它提供了购买商品的API:
class RxGroceries {
Observable<BigDecimal> purchase(String productName, int quantity) {
return Observable.fromCallable(() ->
doPurchase(productName, quantity));
}
BigDecimal doPurchase(String productName, int quantity) {
log("Purchasing " + quantity + " " + productName);
//real logic here
log("Done " + quantity + " " + productName);
return priceForProduct;
}
}
显然,doPurchase()的实现在这里是无关紧要的,只是想象一下它需要一些时间和资源来完成。我们模拟商业逻辑的方法是将sleep的时间增加一秒,如果数量更大,则会稍微高一点。像从purchase()返回的这样的阻塞Observables ,在实际的应用程序中是不寻常的。但是为了教育的目的,让我们继续保持这种方式。在购买几件商品时,我们希望尽可能地并行化,并最终计算出所有货物的总价。第一次尝试是徒劳的:
Observable<BigDecimal> totalPrice = Observable
.just("bread", "butter", "milk", "tomato", "cheese")
.subscribeOn(schedulerA) //BROKEN!!!
.map(prod -> rxGroceries.doPurchase(prod, 1))
.reduce(BigDecimal::add)
.single();
结果是正确的,它是一个只有一个值(即总价格)的Observable,使用reduce()做的计算。对于每个产品,我们都使用数量1调用doPurchase()。然而,尽管使用了由10个线程池支持的schedulerA,代码还是完全串行的:
144 | Sched-A-0 | Purchasing 1 bread
1144 | Sched-A-0 | Done 1 bread
1146 | Sched-A-0 | Purchasing 1 butter
2146 | Sched-A-0 | Done 1 butter
2146 | Sched-A-0 | Purchasing 1 milk
3147 | Sched-A-0 | Done 1 milk
3147 | Sched-A-0 | Purchasing 1 tomato
4147 | Sched-A-0 | Done 1 tomato
4147 | Sched-A-0 | Purchasing 1 cheese
5148 | Sched-A-0 | Done 1 cheese
注意每个产品如何阻塞后续产品的处理。当买面包(bread )的结束时候,黄油(butter )马上就开始了,而不是更早一点出现。奇怪的是,即使用flatMap()代替map()也没有帮助,输出也完全相同:
Observable
.just("bread", "butter", "milk", "tomato", "cheese")
.subscribeOn(schedulerA)
.flatMap(prod -> rxGroceries.purchase(prod, 1))
.reduce(BigDecimal::add)
.single();
该代码不能并发地工作,因为只有一个事件流,所以设计必须按顺序运行。否则,您的Subscriber 需要知道并发通知(onNext()、onComplete()、等等),因此这是一个公平的折衷方案。幸运的是,惯用的解决方案非常接近。主Observable 不能并行化的发射产品。然而,对于每一个产品,我们创建一个新的独立的Observable,作为purchase()调用的返回。因为他们是独立的,我们可以并发的安全地调度他们每一个:
Observable<BigDecimal> totalPrice = Observable
.just("bread", "butter", "milk", "tomato", "cheese")
.flatMap(prod ->
rxGroceries
.purchase(prod, 1)
.subscribeOn(schedulerA))
.reduce(BigDecimal::add)
.single();
你能发现subscribeOn()在哪里吗?主Observable 不做任何事情,所以它并不需要一个特殊的线程池。然而,在flatMap()中创建的每个子流都提供了一个schedulerA。每次都使用subscribeOn()时,调度器就有机会返回一个新的Worker ,因此会有一个单独的线程(简化一点):
113 | Sched-A-1 | Purchasing 1 butter
114 | Sched-A-0 | Purchasing 1 bread
125 | Sched-A-2 | Purchasing 1 milk
125 | Sched-A-3 | Purchasing 1 tomato
126 | Sched-A-4 | Purchasing 1 cheese
1126 | Sched-A-2 | Done 1 milk
1126 | Sched-A-0 | Done 1 bread
1126 | Sched-A-1 | Done 1 butter
1128 | Sched-A-3 | Done 1 tomato
1128 | Sched-A-4 | Done 1 cheese
最后,我们实现了真正的并发性。每次购买操作都在同一时间开始,并最终完成。flatMap()操作符是经过仔细设计和实现的,因此它收集来自所有独立流的所有事件,并按顺序将它们推到下游。然而,正如我们已经在第73页中的“Order of Events After flatMap()”已经了解到的,我们不能再依赖于下游事件的顺序——它们既不以事件发射的顺序开始,也不是以时间发射的顺序结束(原始序列是从面包开始的)。当事件到达reduce()操作符时,它们已经是连续且行为良好的了。
现在,您应该慢慢地离开经典的Thread 模型,并了解调度器是如何工作的。但如果你觉得很难,这里有一个简单的类比:
- 没有任何Scheduler的Observable就像一个单线程程序,且该程序在彼此之间传递数据的带有阻塞方法。
- 带有单个subscribeOn()操作符的Observable就像在后台线程中启动一个大任务。该线程中的程序仍然是连续有序,但至少它在后台运行。
- 使用flatMap()的Observable,且其内部的每一个Observable都使用了subscribeOn()操作符。这样的Observable就像java.util.concurrent中的ForkJoinPool,每个子流都是执行的一个fork(分支),而flatMap()则是一个安全的join阶段。
当然,前面的提示只适用于阻塞的Observables,这在实际应用程序中很少出现。如果您底层的的Observable已经是异步的,那么实现并发性只是了解如何组合它们,以及何时进行订阅的问题。例如,在两个流上merge()将同时订阅它们,而concat()操作符等待直到第一个流完成后才订阅第二个流。