withLatestFrom() operator
combineLatest操作符是对称的,这意味着它不区分它所结合的子流。但是,有时候,你希望当第一个流产生时候,可以去第二个流取一个最新的值,然后结合为元组对并发射时间,但是,反之则不然。换句话说,来自第二条流的事件不会触发一个向下的流事件;它们只在第一个流发射事件时才使用。您可以通过使用最新的withLatestFrom()操作符来实现这种行为。让我们用同样的慢和快的流来解释它:
Observable<String> fast = interval(10, MILLISECONDS).map(x -> "F" + x);
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
.withLatestFrom(fast, (s, f) -> s + ":" + f)
.forEach(System.out::println);
在上面的例子中,slow流是主要的,由此产生的Observable总是会在slow流发射事件时发出事件,并提供fast流到目前为止最新的值。相反,fast流只是在slow流发射某些东西时才使用的助手。作为第二个参数的这个函数,它将把所有的新值从slow流和fast流中最近的值结合起来。但是,来自fast流的新值不会传播到下游;当新的slow流出现时,它们会在内部进行更新。前面的代码片段的输出显示所有slow事件只出现一次,而一些fast事件被删除:
S0:F2
S1:F4
S2:F5
S3:F7
S4:F9
S5:F10
S6:F12
S7:F13
S8:F15
S9:F17
S10:F19
S11:F20
S12:F22
S13:F24
S14:F25
S15:F27
S16:F29
S17:F30
S18:F32
在第一个fast事件之前出现的所有slow事件都会被默默的删除掉,因为没有任何东西可以组合它们。设计就是这样的,但是如果您确实需要保存来自主流的所有事件,您必须确保其他流尽可能快地发出一些虚拟事件。例如,您可以预先让流说一些立即发射的虚拟事件。下面的例子通过将所有事件推到100毫秒前,人为地减慢fast流的速度 (参见72页的“Postponing Events Using the delay() Operator”)。如果没有虚拟事件,我们就会失去一些slow流的事件;然而,通过使用startWith()操作符,我们创建了一个新的Observable,它水从fast流中得到的。它开始的时候就有“FX”值,然后继续产生fast流原先会产出的值:
Observable<String> fast = interval(10, MILLISECONDS)
.map(x -> "F" + x)
.delay(100, MILLISECONDS)
.startWith("FX");
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
.withLatestFrom(fast, (s, f) -> s + ":" + f)
.forEach(System.out::println);
输出显示没有丢失任何slow事件。然而,在开始的时候,我们会看到一些虚拟的“FX”事件,直到第一个“F0”在100毫秒后出现:
S0:FX
S1:FX
S2:FX
S3:FX
S4:FX
S5:FX
S6:F1
S7:F3
S8:F4
S9:F6
...
startWith()基本上返回一个新的Observable,在订阅之后,首先发出一些常量值(如“FX”),然后才是原始的Observable的值。例如,以下代码块会以0,1,2顺序产生值:
Observable
.just(1, 2)
.startWith(0)
.subscribe(System.out::println);
参见94页的“Slicing and Dicing Using skip(), takeWhile(), and Others”获取类似的concat()操作符的例子。
amb() operator
最后一个有用的小操作符是amb()(与ambWith()一起),它订阅上游它所控制的所有Observable,并等待第一个条目发射。当有一个Observable发射了最早的事件时,amb()会丢弃所有其他的流,并只转发这个最早发射事件的Observable发射的后续事件,如下面的弹珠图所示:
下面的示例演示了amb()如何处理两个流。注意initialDelay参数,该参数控制在发射第一个值之前等待的初始延迟时间:
Observable<String> stream(int initialDelay, int interval, String name) {
return Observable
.interval(initialDelay, interval, MILLISECONDS)
.map(x -> name + x)
.doOnSubscribe(() ->
log.info("Subscribe to " + name))
.doOnUnsubscribe(() ->
log.info("Unsubscribe from " + name));
}
//...
Observable.amb(
stream(100, 17, "S"),
stream(200, 10, "F")
).subscribe(log::info);
您可以使用非静态方法ambWith()来编写一个等效的程序,但它的可读性较差,因为它隐藏了amb()的对称性。看起来我们好像是在第一个流上处理第二个流,但是两者都应该得到同等对待:(即容易引起阅读歧义)
stream(100, 17, "S")
.ambWith(stream(200, 10, "F"))
.subscribe(log::info);
不管你喜欢哪个版本,结果都是一样的。慢流产生事件的频率较低,但第一个事件在100毫秒后出现,而快速流则在200毫秒后开始。amb()所做的是首先订阅两个Observables,当它遇到slow流中的第一个事件时,它立即从fast中取消订阅,并且只从slow流的一个中转发事件:
14:46:13.334: Subscribe to S
14:46:13.341: Subscribe to F
14:46:13.439: Unsubscribe from F
14:46:13.442: S0
14:46:13.456: S1
14:46:13.473: S2
14:46:13.490: S3
14:46:13.507: S4
14:46:13.525: S5
doOnSubscribe()和doOnUnsubscribe()回调用于调试目的(见270页的“doOn…() Callbacks”)。请注意,在订阅了S之后,F的取消订阅大约是100毫秒后。这是第一个Observable发出首个事件的时刻。此时,从F中收听事件不再有任何意义,因此执行了解除订阅操作