buffer()和window()将多个事件组合在一起,以便您可以批量处理它们。 sample()偶尔选择一个比较随意的事件。这些运算符不考虑事件之间的时间间隔。但在许多情况下,如果事件发生后不久又发生另一个事件,那么事件(这里说的是之前的事件,扔掉旧的,捡起新的)就会被丢弃。例如,假设有一股来自交易平台的股票价格的流
Observable<BigDecimal> prices = tradingPlatform.pricesOf("NFLX");
Observable<BigDecimal> debounced = prices.debounce(100, MILLISECONDS);
debounce()(别名:throttleWithTimeout())丢弃所有紧随其后就有其他事件发生的事件。换句话说,如果一个给定事件不是出现在某个事件出现之后的一个时间窗口内的话,那么该事件就会被释放。在前面的例子中,每当“NFLX”股票的价格变化的话,price流就会推送它。价格有时会经常变化,每秒几十次。对于每一个价格变化,我们可能希望运行一些需要花费大量时间来完成的计算。然而,如果一个新的价格到来,计算结果与此无关;它必须以这个新价格开始计算。因此,如果在该事件发生之后的某个时间窗口之内又发生了新的事件的话,我们想抛弃掉旧的值。
debounce()在前面的示例中等待了一点时间(实例中是100毫秒),以防稍后第二个事件紧随出现。这个过程自己会重复,这样如果第二个事件发生在第一个事件发生之后不到100毫秒之内,RxJava将抛弃掉旧的事件,并推迟新事件的发射,因为它在等待时间窗口内会不会有第三个事件出现。这一次,您还可以选择灵活地控制在每个事件基础上等待多长时间。例如,如果在100毫秒的时间窗口内出现了新的价格更新,您可能会忽略这样的股票价格的变化。然而,如果价格超过150美元,我们愿意毫不犹豫地加快下游的更新速度。也许因为某些类型的事件需要马上处理;例如,因为他们是巨大的市场机会。您可以通过使用重载版本的debounce()实现这一点:
prices
.debounce(x -> {
boolean goodPrice = x.compareTo(BigDecimal.valueOf(150)) > 0;
return Observable
.empty()
.delay(goodPrice? 10 : 100, MILLISECONDS);
})
对于每一个更新的价格X,我们应用复杂的逻辑(> $150)来计算价格是否合适。然后,对于每一个这样的更新,我们返回一个惟一的Observable,它是空的。它不需要发射任何事件;最重要的是它的Completion事件。对于好的价格,它会在10毫秒后发出一个Completion通知。对于其他价格,此Observable在100毫秒后发射Completion通知。如果在时间窗口的事件内,是Completion的通知先到达,那么就将该事件发送到下游;如果在时间窗口中,上游Observable的事件先到达,而不是Completion通知的话,则会如此往复的循环。
在我们的示例中,当值是$140的价格X出现时,debounce()操作符创建一个新的Observable,通过我们提供的表达式延迟100毫秒。如果在此事件完成之前没有出现事件,$140事件将被转发到下游。但是,假设另一个价格更新$151出现了。这一次,当debounce()操作符要求我们提供一个Observable(在API中称为debounceSelector)时,我们返回一个更快的流, 它会在10毫秒后完成。所以,如果价格好(即大于150美元),我们愿意为后续的更新等待10毫秒。如果你还在努力了解debounce()的工作原理,这里有一个股票价格模拟器,你可以试试:
Observable<BigDecimal> pricesOf(String ticker) {
return Observable
.interval(50, MILLISECONDS)
.flatMap(this::randomDelay)
.map(this::randomStockPrice)
.map(BigDecimal::valueOf);
}
Observable<Long> randomDelay(long x) {
return Observable
.just(x)
.delay((long) (Math.random() * 100), MILLISECONDS);
}
double randomStockPrice(long x) {
return 100 + Math.random() * 10 +
(Math.sin(x / 100.0)) * 60.0;
}
前面的代码很好地构成了几个流。首先,我们生成一个固定50毫秒间隔的Long值序列。然后,我们将每个事件独立地延迟为0到100毫秒之间的随机值。最后,我们将无限长的数字转换成正弦波(使用Math.sin())随机抖动。这模拟了股票价格随时间的波动。如果您对debounce()操作符运行此流,您会注意到,只要价格低,事件通常不会频繁发生,因为我们愿意等待100毫秒。但当价格超过150美元时,debounce()会将等待时间下降至10毫秒,所以每一次好的价格更新都会被转发到下游。
Avoid starvation in debounce()
很容易想象一种情况,即debounce()操作符阻止所有事件的排放,因为它们只是出现得太频繁,而且从来没有片刻的沉默:
Observable
.interval(99, MILLISECONDS)
.debounce(100, MILLISECONDS)
这样的数据源不会发出任何事件,因为debounce()会等待100毫秒,以确保没有最近的事件。不幸的是,在这个暂停之前只有1毫秒时候,一个新的事件出现了,开始了debounce的时间。这导致了一个Observable可能会经常产生时间,但是我们可能永远也看不到他们中的任何一个!您可以将其称为特性,但实际上您可能希望不时地看到一些事件,即使是在大流量的情况下。为了防止这种情况发生,我们必须有一点创造性。
首先,我们必须发现一种情况,在这种情况下,很长一段时间内都不会出现新的事件。我们已经在第251页“Timing Out When Events Do Not Occur”一节中使用了timeout()操作符,所以我知道这部分很简单:
Observable
.interval(99, MILLISECONDS)
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS);
现在,我们至少有一个异常信号,信号是一个空闲的上游源。奇怪的是,情况恰恰相反------上游interval()操作员经常产生事件,也正是因此,导致debounce()从不向下传递-----但我们跑题了。如果事情经常出现,我们会让他们等待片刻的沉默。但是如果这种沉默太长(超过一秒),我们宣布失败,并抛出了一个TimeoutException 。与其永久地失败,我们实际上希望从上游Observablea看到一个任意值,并继续下去。任务的第一部分很简单:
ConnectableObservable<Long> upstream = Observable
.interval(99, MILLISECONDS)
.publish();
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream.take(1));
upstream.connect();
timeout()操作符有一个重载的版本,它接受超时后的回滚。不幸的是,这里有一个微妙的缺陷。在超时的情况下,我们天真地取上游开始的第一个遇到的条目,然后完成。我们真正想要的是继续从上游发射事件,仍然需要debounce()的支持。
PS:ConnectableObservable
在这里,ConnectableObservable需要结合publish()和connect()对来将Cold类型的Observable. interval()转换为Hot类型的的(参见第43页上的“hot and cold Observables”一节,以了解为什么interval()是Cold类型的,以及这意味着什么)。通过在publish()之后调用connect(),我们强制interval()操作符立即开始生成事件,即使没有任何人订阅。这意味着,如果我们在几秒钟之后订阅了这样一个Observable的内容,它就会开始在中间接收事件,并且所有订阅者都在同一时间会得到相同的事件。默认情况下,interval()是一个Cold类型的Observable,因此对于每个订阅者,无论什么时候订阅,都是从0开始。
另一种方法似乎更好:
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream
.take(1)
.concatWith(
upstream.debounce(100, MILLISECONDS)))
乍一看似乎还行。在使用debounce()之后,原始的源代码有一个超时。当超时发生时,我们会发射我们遇到的第一个条目,并继续使用相同的源,当然也是使用debounce()操作符。但是,在第一次超时的情况下,我们切换到不再有timeout()操作符以及回滚的Observable 。一种快速、肮脏、目光短浅的修复方法:
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream
.take(1)
.concatWith(
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream)))
但是,我们再次忘记在内部timeout()操作符中放置一个回滚的Observable。够了,您应该已经在这里看到了一个重复的模式。与其无限重复同样的形式 upstream → debounce → timeout() → upstream → …我们可以用递归
import static rx.Observable.defer;
Observable<Long> timedDebounce(Observable<Long> upstream) {
Observable<Long> onTimeout = upstream
.take(1)
.concatWith(defer(() -> timedDebounce(upstream)));
return upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, onTimeout);
在timedDebounce方法中,onTimeout回滚的定义很复杂。我们声明,它首先从上游(原源)中抽取一个样本事件,然后递归地调用timedDebounce()方法。我们必须使用defer()操作符来避免无限递归。timedDebounce()的其余部分基本上采用了原始的上游源代码,应用了debounce()操作符,并添加了回滚onTimeout。这个回退会做完全相同的事情:应用debounce(),添加一个timeout(),然后是回滚---递归的。
如果你发现刚开始很难理解,不要沮丧。这是一个相当复杂的例子,它展示了流组合的力量,以及惰性和递归。你几乎不需要那样的复杂性,但是当你掌握了它的工作原理之后,它就相当令人满意了。利用这段代码,观察微小的变化如何极大地改变流相互作用的方式。