当使用buffer()时,我们一次又一次地构建List实例。为什么我们要构建这些中间Lists,而不是在动态的消耗事件呢?这就是window()操作符变得有用的地方。如果可能,您应该选择window()而不是buffer(),因为后者在内存使用方面的预测更不容易预测(即对无法准确的预测内存)。window()操作符非常类似于buffer():它有类似的重载版本,包括下面的版本:
接收int,将事件从数据源分组到固定大小的列表中
接收时间单元,在固定时间内分组事件
接收自定义的Observable ,标记每个批处理的开始和结束
那么,有什么区别呢?还记得计算给定数据源中每秒发生多少事件的例子吗?让我们再看一看:
Observable<KeyEvent> keyEvents = //...
Observable<Integer> eventPerSecond = keyEvents
.buffer(1, SECONDS)
.map(List::size);
我们将每秒发射一次事件的Observable<KeyEvent>,批处理为Observable<List<KeyEvent>>。下一步,我们将列表映射到它的大小。这是相当浪费的,特别是如果每一秒发生的事件数目是有意义的:
Observable<Observable<KeyEvent>> windows = keyEvents.window(1, SECONDS);
Observable<Integer> eventPerSecond = windows
.flatMap(eventsInSecond -> eventsInSecond.count());
与buffer()相反,window()返回一个 Observable<Observable<KeyEvent>> 。想一下。我们接收的是一条流的流(即事件是流),而不是接收每个都包含一个批次的固定列表。每当一个新的批处理开始(前面的例子中的每一秒),一个新的Observable<KeyEvent>值出现在外部流中。我们可以进一步转换所有这些内部流,但是为了避免双重包装,我们使用flatMap()。flatMap()接收每个缓(本例中是指:Observable<KeyEvent>)作为参数,并假定返回另一个Observable。count()操作符(参考94页的“Slicing and Dicing Using skip(),takeWhile(), and Others”)将Observable<T>转换为Observable<Integer>,而这个Observable仅发射一个值。该值表示原始Observable的事件的数量。因此对于我们产生的每一个一秒钟的批处理,会出现(或者说是发射)这一秒钟之内发生的事件的次数;但是没有内部缓冲;count()操作符在传递过程中会对事件进行计数。