在79页上的“Pairwise Composing Using zip() and zipWith()”中,我们提出了一个非常大胆的假设,即两个Observables总是以相同的频率和在相同的时间点产生事件。然而,如果其中一个流比另一个稍微快一点,那么从更快的Observable中发生的事件将需要等待更长的时间。为了说明这一点,让我们先来看看两个以相同速度生产的流:

Observable<Long> red = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable<Long> green = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable.zip(
    red.timestamp(),
    green.timestamp(),
    (r, g) -> r.getTimestampMillis() - g.getTimestampMillis()
).forEach(System.out::println);

red和green这两个Observable以相同的频率生产事件。对于每个条目,我们附加时间戳(),以便我们知道它何时发出。

timestamp()

timestamp()操作符用rx.schedulers.Timestamped<T>包装事件类型T。timestamp <T >类具有两个属性:T类型的原始值和创建时的long类型的时间戳。

在zip()转换中,我们只需比较每个流中事件的创建时间差异。当流被同步时,这个值会在零附近摆动。但是,如果我们稍微减慢一个Observable,比如green这个Observable变成了Observable.interval(11, MILLISECONDS),情况就大不一样了。red和green这两个Observable的时间差持续上升:red被实时消耗,但它必须等待,即我们为green这个设置的减慢的时间长度。随着时间的推移,这种差异会累积起来,并可能导致过期数据甚至内存泄漏(参见315页的“内存消耗和泄漏”)。在实践中,必须小心使用zip()。

我们实际期望的是,每次上游产生一个事件时,并使用另一个流中的最新已知值发射出一个元组对。这就是combineLatest()操作符变得有用的地方,如下图所示:

用下面这个人造的例子为例,S0,S1,S2值每间隔17毫秒发射,而另一个F0,F1,F2每间隔10毫秒发射(相当快):

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static rx.Observable.interval;
Observable.combineLatest(
    interval(17, MILLISECONDS).map(x -> "S" + x),
    interval(10, MILLISECONDS).map(x -> "F" + x),
    (s, f) -> f + ":" + s
).forEach(System.out::println);

我们将这两种流组合起来,每当有任何一条流产生某种东西时,就会产生一个新的值。输出快的就会变得不同步,但至少会实时地消耗值,这样快的流不需要等待慢的流:

F0:S0
F1:S0
F2:S0
F2:S1
F3:S1
F4:S1
F4:S2
F5:S2
F5:S3
...
F998:S586
F998:S587
F999:S587
F1000:S587
F1000:S588
F1001:S588

注意在每个F事件上出现的下游事件:F0:S0,F1:S0,F2:S0.从这三个样本,我们发现RxJava目前主要关注在快流上的新事件,然后从漫流上取最近的一个值进行组合---在本例里是S0-------然后产生一个元组对。然而,哪一个流都不是卓越的:当慢的流上产出S1事件,它会与快流上最新的事件F2结合。大约10秒后,我们遇到F1000:S588事件。所有的加起来:在10秒内,快流产生了大约1000个事件,而漫流只产生了588个事件(10秒除以17毫秒)

withLatestFrom() operator

combineLatest操作符是对称的,这意味着它不区分它所结合的子流。但是,有时候,你希望当第一个流产生时候,可以去第二个流取一个最新的值,然后结合为元组对并发射时间,但是,反之则不然。换句话说,来自第二条流的事件不会触发一个向下的流事件;它们只在第一个流发射事件时才使用。您可以通过使用最新的withLatestFrom()操作符来实现这种行为。让我们用同样的慢和快的流来解释它:

Observable<String> fast = interval(10, MILLISECONDS).map(x -> "F" + x);
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
    .withLatestFrom(fast, (s, f) -> s + ":" + f)
    .forEach(System.out::println);

在上面的例子中,slow流是主要的,由此产生的Observable总是会在slow流发射事件时发出事件,并提供fast流到目前为止最新的值。相反,fast流只是在slow流发射某些东西时才使用的助手。作为第二个参数的这个函数,它将把所有的新值从slow流和fast流中最近的值结合起来。但是,来自fast流的新值不会传播到下游;当新的slow流出现时,它们会在内部进行更新。前面的代码片段的输出显示所有slow事件只出现一次,而一些fast事件被删除:

S0:F2
S1:F4
S2:F5
S3:F7
S4:F9
S5:F10
S6:F12
S7:F13
S8:F15
S9:F17
S10:F19
S11:F20
S12:F22
S13:F24
S14:F25
S15:F27
S16:F29
S17:F30
S18:F32

在第一个fast事件之前出现的所有slow事件都会被默默的删除掉,因为没有任何东西可以组合它们(上面例子中不会出现这种情况,因为slow设计的比fast要慢,所以定是fast先出现值,之后slow才出现值)。设计就是这样的,但是如果您确实需要保存来自主流的所有事件,您必须确保其他流尽可能快地发出一些虚拟事件。例如,您可以预先让流说一些立即发射的虚拟事件。下面的例子通过将所有事件推到100毫秒前,人为地减慢fast流的速度 (参见72页的“Postponing Events Using the delay() Operator”)。如果没有虚拟事件,我们就会失去一些slow流的事件;然而,通过使用startWith()操作符,我们创建了一个新的Observable,它水从fast流中得到的。它开始的时候就有“FX”值,然后继续产生fast流原先会产出的值:

Observable<String> fast = interval(10, MILLISECONDS)
    .map(x -> "F" + x)
    .delay(100, MILLISECONDS)
    .startWith("FX");
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
    .withLatestFrom(fast, (s, f) -> s + ":" + f)
    .forEach(System.out::println);

输出显示没有丢失任何slow事件。然而,在开始的时候,我们会看到一些虚拟的“FX”事件,直到第一个“F0”在100毫秒后出现:

S0:FX
S1:FX
S2:FX
S3:FX
S4:FX
S5:FX
S6:F1
S7:F3
S8:F4
S9:F6
...

startWith()基本上返回一个新的Observable,在订阅之后,首先发出一些常量值(如“FX”),然后才是原始的Observable的值。例如,以下代码块会以0,1,2顺序产生值:

Observable
    .just(1, 2)
    .startWith(0)
    .subscribe(System.out::println);

参见94页的“Slicing and Dicing Using skip(), takeWhile(), and Others”获取类似的concat()操作符的例子。

amb() operator

最后一个有用的小操作符是amb()(与ambWith()一起),它订阅上游它所控制的所有Observable,并等待第一个条目发射。当有一个Observable发射了最早的事件时,amb()会丢弃所有其他的流,并只转发这个最早发射事件的Observable发射的后续事件,如下面的弹珠图所示:

下面的示例演示了amb()如何处理两个流。注意initialDelay参数,该参数控制在发射第一个值之前等待的初始延迟时间:

Observable<String> stream(int initialDelay, int interval, String name) {
    return Observable
        .interval(initialDelay, interval, MILLISECONDS)
        .map(x -> name + x)
        .doOnSubscribe(() ->
            log.info("Subscribe to " + name))
        .doOnUnsubscribe(() ->
            log.info("Unsubscribe from " + name));
    }
//...
Observable.amb(
    stream(100, 17, "S"),
    stream(200, 10, "F")
).subscribe(log::info);

您可以使用非静态方法ambWith()来编写一个等效的程序,但它的可读性较差,因为它隐藏了amb()的对称性。看起来我们好像是在第一个流上处理第二个流,但是两者都应该得到同等对待:(即容易引起阅读歧义)

stream(100, 17, "S")
    .ambWith(stream(200, 10, "F"))
    .subscribe(log::info);

不管你喜欢哪个版本,结果都是一样的。慢流产生事件的频率较低,但第一个事件在100毫秒后出现,而快速流则在200毫秒后开始。amb()所做的是首先订阅两个Observables,当它遇到slow流中的第一个事件时,它立即从fast中取消订阅,并且只从slow流的一个中转发事件:

14:46:13.334: Subscribe to S
14:46:13.341: Subscribe to F
14:46:13.439: Unsubscribe from F
14:46:13.442: S0
14:46:13.456: S1
14:46:13.473: S2
14:46:13.490: S3
14:46:13.507: S4
14:46:13.525: S5

doOnSubscribe()和doOnUnsubscribe()回调用于调试目的(见270页的“doOn…() Callbacks”)。请注意,在订阅了S之后,F的取消订阅大约是100毫秒后。这是第一个Observable发出首个事件的时刻。此时,从F中收听事件不再有任何意义,因此执行了解除订阅操作

results matching ""

    No results matching ""