您是否还记得在第67页“Wrapping Up Using flatMap()”的 Observable<LicensePlate> recognize(CarPhoto photo)方法,它从CarPhoto中异步地识出LicensePlate?我们简单地提到过,这样的流实际上可以同时使用几种算法,有些是更快,有些更精确。然而,我们不希望将这些算法的细节暴露给外部世界,我们只是想要从每一个算法中逐步得到更好的结果,从最快到最准确。
假设我们有三个相关的算法,每个算法都很好地封装在了Observavble范围内。当然,每一种算法都能产生0到无限数量的结果:
Observable<LicensePlate> fastAlgo(CarPhoto photo) {
//Fast but poor quality
}
Observable<LicensePlate> preciseAlgo(CarPhoto photo) {
//Precise but can be expensive
}
Observable<LicensePlate> experimentalAlgo(CarPhoto photo) {
//Unpredictable, running anyway
}
我们想要做的是一起运行这三种算法(参见:150页上的“Declarative Subscription with subscribeOn()”,以获取更多关于RxJava如何处理并发性的细节),并尽快收到结果。我们不关心哪个算法发出一个事件,我们想要捕获它们并聚合成一个单独的流。这就是merge()操作符所做的:
Observable<LicensePlate> all = Observable.merge(
preciseAlgo(photo),
fastAlgo(photo),
experimentalAlgo(photo)
);
:
我特意把preciseAlgo() (大概是最慢的)放在了merge调用的首个上,只是为了强调传递到merge()的Observable的顺序是任意的。merge()操作符将对所有底层的Observables保持一个引用,一旦有人订阅了Observable<LicensePlate>,它就会立刻自动订阅所有的上游Observable。无论哪一个先发出一个值,它都会被转发给所有的观察者。当然,merge()操作符遵循Rx契约(参见“27页的“Anatomy of rx.Observable”,确保事件可以串行化(不发生重叠),即使底层的流在同一时间发出一个值。下面的弹珠图展示了merge()是如何工作的:
当您想要将同一类型的多个事件源作为单个源处理时,merge()操作符被广泛使用。另外,如果您只有两个Observables想要merge在一起,那么您可以使用 obs1.mergeWith(obs2) 实例方法。
请记住,任何出现在任何Boservable上的Error都将被积极地传播到观察者上。您可以使用merge()的mergeDelayError()变体来将所有错误推迟到所有其他流完成为止。mergeDelayError()甚至会确保收集所有的异常,不光只是第一个,并把他们封装为一个 rx.exceptions.CompositeException。