有些情况下,您肯定希望接收和处理从上游Observable 推送来的每一个事件。但是,有些情况下,周期性抽样就足够了。最明显的例子是来自某些设备的测量值;例如,温度(与在第92页中的“Dropping Duplicates Using”比较)。设备产生新测量值的频率通常与我们无关,特别是当测量经常出现但彼此非常相似的情况下。sample()操作符定期查看上游Observable(例如,每秒查看一次)并发出最后遇到的事件。如果在最后一秒的事件段内没有发生任何事件,则没有样本被转发到下游,下一个采样是在样一秒钟后,如下图所示:

long startTime = System.currentTimeMillis();
Observable
    .interval(7, TimeUnit.MILLISECONDS)
    .timestamp()
    .sample(1, TimeUnit.SECONDS)
    .map(ts -> ts.getTimestampMillis() - startTime + "ms: " + ts.getValue())
    .take(5)
    .subscribe(System.out::println);

前面的代码片段将打印类似于以下内容的内容:

1088ms: 141
2089ms: 284
3090ms: 427
4084ms: 569
5085ms: 712

第一列显示了从订阅到样本发射的相对时间。您可以清楚地看到,第一个示例出现的时间略多于一秒(根据sample()操作符的请求),随后的示例大约是发生在一秒钟之后。更重要的是,注意这些采样的值是什么。interval()操作符发出的自然数从哦开始,每7毫秒发射一次。因此,当第一个样本被取走时,我们可以期望出现了142(1000 / 7)个事件,其中第142个值是141(因为是基于0开始的)。

我们来看看一个更复杂的样本。假设您有一个名字的列表,他们以特定的延迟的出现,比如:

Observable<String> names = Observable
    .just("Mary", "Patricia", "Linda",
    "Barbara",
    "Elizabeth", "Jennifer", "Maria", "Susan",
    "Margaret", "Dorothy");
Observable<Long> absoluteDelayMillis = Observable
    .just(0.1, 0.6, 0.9,
        1.1,
        3.3, 3.4, 3.5, 3.6,
        4.4, 4.8)
    .map(d -> (long)(d * 1_000));
Observable<String> delayedNames = names
    .zipWith(absoluteDelayMillis,
        (n, d) -> Observable
            .just(n)
            .delay(d, MILLISECONDS))
        .flatMap(o -> o);
delayedNames
    .sample(1, SECONDS)
    .subscribe(System.out::println);

首先,我们构造一系列的名字,然后是绝对延迟序列(以秒为单位,随后映射到毫秒)。使用zipWith()操作符,我们delay()某些名字 的出现;例如,Mary在订阅100毫秒后出现,而Dorothy则在4.8秒后出现。sample()操作符将周期性地(每秒钟一次)从上一个周期内的流中选择最后一个看到的名字。所以,在第一秒之后,我们打印出了Linda,之后是Barbara。现在,从订阅开始的在2000到3000毫秒之间,没有出现任何名字,因此sample()不会发出任何东西。Barbara被释放两秒钟后,我们看到了Susan。sample()将会转发完成通知(或者是错误通知),并抛弃最后一个时间采样周期(住:在实验中,我没有发现这个问题,也就是说即使你不额外添加其他操作,Dorothy也会打印的。我们目前按照作者的思路,即在打印了Susan后,4.8秒后接到Completion通知,然后转发并抛弃最后一次的采样机会)。如果我们想看到Dorothy也出现,我们可以人为地推迟完成通知,就像在这里做的那样:

static <T> Observable<T> delayedCompletion() {
    return Observable.<T>empty().delay(1, SECONDS);
}
//...
delayedNames
    .concatWith(delayedCompletion())
    .sample(1, SECONDS)
    .subscribe(System.out::println);

sample()有一个更高级的变体,它以Observable作为参数,而不是一个固定的采样周期。这第二种Observable (称为采样器)基本上规定了什么时候从上游源取样:每次采样器发出任何值,就会取一个新的样本(如果从上一个样本之后又出现了新的值的话)。您可以使用这个重载版本的sample()来动态更改采样率,或者只在非常特定的时间点取样。例如,当一个新的帧被重新绘制或当一个键被按下时候,获取某个值的快照。一个微不足道的示例可以简单地使用interval()操作符来模拟固定周期:

//equivalent:
obs.sample(1, SECONDS);
obs.sample(Observable.interval(1, SECONDS));

如您所见,关于sample()的行为有一些微妙之处。与其依赖我们对文档或手工验证的理解,还不如进行自动化测试。测试时间敏感的操作符,如sample(),将在第258页的“Virtual Time”小节中讲解。

sample()在RxJava中有一个名为throttleLast()的别名。对称地,还有throttleFirst()操作符,它会发出每个周期中出现的第一个事件。因此,在我们的名字流中应用throttleFirst()代替sample()会产生预期的结果:

Observable<String> names = Observable
    .just("Mary", "Patricia", "Linda",
        "Barbara",
        "Elizabeth", "Jennifer", "Maria", "Susan",
        "Margaret", "Dorothy");
Observable<Long> absoluteDelayMillis = Observable
    .just(0.1, 0.6, 0.9,
        1.1,
        3.3, 3.4, 3.5, 3.6,
        4.4, 4.8)
    .map(d -> (long)(d * 1_000));
    //...
delayedNames
    .throttleFirst(1, SECONDS)
    .subscribe(System.out::println);

输出如下:

Mary
Barbara
Elizabeth
Margaret

就像sample()(aka throttleLast())一样,当Barbara和Elizabeth之间没有新名字出现时,throttleFirst()不会发出任何事件。

results matching ""

    No results matching ""