最糟糕的阻塞式API是你可能需要与需要轮询的变化一起工作。它没有提供任何机制来推送对你的改变,即使通过回调或无限期地阻塞。这个API给出的唯一机制是请求当前状态,如果您想知道它是否与以前的状态不同,则由您来判断。RxJava有一些真正非常强大的操作符,您可以应用它将给定的API重构为Rx风格的。我要你考虑的第一个例子是一个简单的方法,它交付代表状态的单个值,,例如,long getOrderBookLength()。为了跟踪更改,我们必须经常调用此方法并捕获差异。您可以在RxJava中使用非常基本的运算符组合实现此功能:
Observable
.interval(10, TimeUnit.MILLISECONDS)
.map(x -> getOrderBookLength())
.distinctUntilChanged()
首先,我们每10毫秒自动产生一个long类型的值,作为一个基本的滴答计数器。对于每一个这样的值(即每10毫秒),我们调用getOrderBookLength()。然而,上面提到的方法并没有经常改变,我们也不想让我们的订阅者充斥着大量的不相关的状态变化。幸运的是,我们可以简单地说,distinctUntilChanged()和RxJava 将会透明地跳过那些自上次调用以来没有改变的long类型的值,如下面的弹珠图所示:
我们可以进一步应用这个模式。假设您正在监视文件系统或数据库表的更改。您处理的唯一机制是获取当前文件或数据库记录的快照。您正在构建一个API,该API将通知客户每一个新的条目。显然,您可以使用 java.nio.file.WatchService或数据库触发器,但将此作为一个示范例子。这一次,我们再次定期的获取当前状态的快照:
Observable<Item> observeNewItems() {
return Observable
.interval(1, TimeUnit.SECONDS)
.flatMapIterable(x -> query())
.distinct();
}
List<Item> query() {
//take snapshot of file system directory
//or database table
}
distinct()操作符保存了通过它传递的所有条目的记录(参见“在第92页中“Dropping Duplicates Using distinct() and distinctUntilChanged()”)。如果第二次出现相同的项,则忽略它。这就是为什么我们每秒钟都能推送相同的条目列表。第一次他们被推到下游的所有订阅者。然而,当同一列表在一秒钟后出现时,所有的条目都已经看到了,因此被丢弃。如果在某个时间点,从query()返回的列表包含一个额外的条目,distinct()将会将其发射到下游,但是当下次它再次出现时候,还是会被忽略掉。这个简单的模式允许我们替换一堆Thread. sleep()调用和手动缓存定期的轮询。它适用于许多领域,如文件传输协议(FTP)轮询、web抓取等等。