缓冲和移动窗口是RxJava提供的最令人兴奋的内置运算符。它们都通过一个窗口来遍历输入流,它捕获了几个连续的元素并向前移动。一方面,它们允许批处理上游源推送的值,以更有效地处理它们。在实践中,它们是灵活的、通用的工具,可以动态地收集各种数据。

buffer()操作符将实时的批量事件聚合到一个列表中。但是,与toList()操作符不同的是,buffer()发出几个列表,将一些后续事件分组,而不是仅包含一个包含所有事件的事件(就像toList()那样)。最简单形式的buffer()将上游Observable 的值分组成大小相同的列表:

Observable
.range(1, 7) //1, 2, 3, ... 7
    .buffer(3)
    .subscribe((List<Integer> list) -> {
        System.out.println(list);
    }
);

当然,在这里直接使用subscribe(system . out::println)也是对的;我们保留类型信息视为了教育目的。输出显示了来自buffer(3)操作符的三个事件:

[1, 2, 3]
[4, 5, 6]
[7]

buffer()保持接收上游事件并在缓冲区中缓存它们(因此得名buffer),直到缓冲区的大小达到3。当发生这种情况时,整个缓冲区(List<Integer>)被推送到下游。当Completion通知出现时,而内部缓冲区不是空的(但还没有到3个),它还是会被推到下游。这就是为什么我们最后会看到一个元素列表的原因。

通过使用buffer(int)操作符,您可以用更少但更大的批次,来替换几个细粒度事件。例如,如果您想减少数据库负载,您可能想要将每个事件单独存储替换为批量方式存储:

interface Repository {
    void store(Record record);
    void storeAll(List<Record> records);
}
//...
Observable<Record> events = //...
events
    .subscribe(repository::store);
//vs.
events
    .buffer(10)
    .subscribe(repository::storeAll);

后一种订阅调用Repository 上的storeAll,一次批量存储10个元素。这可能会提高应用程序的吞吐量。

buffer()有许多重载的变体。稍微复杂一点的版本允许在buffer()将该列表推到下游时,配置从内部缓冲区中删除多少个旧值。这听起来很复杂,但用更基本的术语来说,你可以通过一个特定大小的移动窗口来观察你的事件流:

Observable
    .range(1, 7)
    .buffer(3, 1)
    .subscribe(System.out::println);

这就产生了几个重叠的列表:

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7]
[7]

如果要计算一些时序数据的移动平均值,可以使用buffer(N,1)变体。下面的代码示例生成了来自正态分布的1000个随机值。之后,我们取一个100个元素的滑动窗口(一次只推进一个元素),并计算这样一个窗口的平均值。你自己运行下这个程序,会注意移动平均值比随机无序值平滑多了。

import java.util.Random;
import java.util.stream.Collectors;
//...
Random random = new Random();
Observable
    .defer(() -> just(random.nextGaussian()))
    .repeat(1000)
    .buffer(100, 1)
    .map(this::averageOfList)
    .subscribe(System.out::println);
//...
private double averageOfList(List<Double> list) {
    return list
        .stream()
        .collect(Collectors.averagingDouble(x -> x));
}

可以想象,调用buffer(N)实际上相当于buffer(N,N)。最简单形式的buffer()在它满时就会删掉整个内部缓冲区。有趣的是,buffer(int, int) 的第二个参数(它指定了缓冲区被向下推时要跳过多少个元素)可以比第一个参数更大,这样就可以有效地跳过了一些元素!

Observable<List<Integer>> odd = Observable
    .range(1, 7)
    .buffer(1, 2);
odd.subscribe(System.out::println);

这个设置转发第一个元素,然后跳过两个:第一个元素和第二个元素。然后循环重复:buffer()转发第三个元素,然后跳过第三个元素和第四个元素。实际上,输出是:[1] [3] [5] [7]。注意,奇Observable 的每个元素实际上都是仅包含一个元素的列表。您可以使用flatMap()或flatMapIterable()来返回一个简单的Observable<Integer> :

Observable<Integer> odd = Observable
    .range(1, 7)
    .buffer(1, 2)
    .flatMapIterable(list -> list);

flatMapIterable()期望一个函数,将流中的每个值(在我们的例子中,是指仅有单个元素的列表List<Integer>)转换为一个List 。恒等式转换(list -> list)在这里已经足够了。

Buffering by time periods

buffer()实际上是一个很广泛的的操作符家族(重载版本很多)。与其基于一定大小来批量处理上游事件(每个批处理的大小相同),另一种buffer()的变体可以按时间周期批处理事件。当throttleFirst()和throttleLast()会采用给定的时间内进行的第一次或者最后一次事件的时候,buffer的重载版本之一会批量处理每个时期内的所有事件。回到我们的名字例子:

Observable<String> names = just(
    "Mary", "Patricia", "Linda", "Barbara", "Elizabeth",
    "Jennifer", "Maria", "Susan", "Margaret", "Dorothy");
Observable<Long> absoluteDelays = just(
    0.1, 0.6, 0.9, 1.1, 3.3,
    3.4, 3.5, 3.6, 4.4, 4.8
).map(d -> (long) (d * 1_000));
Observable<String> delayedNames = Observable.zip(names,
    absoluteDelays,
    (n, d) -> just(n).delay(d, MILLISECONDS)
).flatMap(o -> o);
delayedNames
    .buffer(1, SECONDS)
    .subscribe(System.out::println);

接收一个事件周期参数的 buffer()重载版本(在前面的示例中,它接受了一秒的时间周期的),它会在该周期内聚集所有上游事件。因此,buffer()会收集在第一个时间段内发生的所有事件,第二个时间段内的事件,等等:

[Mary, Patricia, Linda]
[Barbara]
[]
[Elizabeth, Jennifer, Maria, Susan]
[Margaret, Dorothy]

第三个List< String >打印的是空的,因为在那个时间周期中没有出现任何事件。buffer()的一个用例是计算每个时间段的事件数;例如,每秒钟的关键事件数:

Observable<KeyEvent> keyEvents = //...
Observable<Integer> eventPerSecond = keyEvents
    .buffer(1, SECONDS)
    .map(List::size);

幸运的是,如果在一秒钟内没有事件发射,那么会产生一个空的列表,所以我们的度量中没有空白。但是,这不是最有效的方法,因为我们很快就会发现 window()操作符。

最全面的重载buffer()允许您完全控制该操作符何时开始缓冲事件,以及何时将缓冲区冲洗到下游。换句话说,您选择在哪些时间段内对上游事件进行分组。想象一下,你正在监控一些经常推送遥测数据的工业设备。数据量是压倒性的,所以为了节省一些计算能力,你决定只看某些样本。算法如下:

  • 在营业时间(9:00 - 17:00),我们每秒钟拍100毫秒长的快照(处理大约10%的数据)

  • 在营业时间之外,我们只看每5秒钟拍摄的200毫秒长的快照(4%)

换句话说,每隔一秒(或5秒),我们就会对所有事件进行100毫秒(或200毫秒)的缓冲,并发出该周期内所有事件的列表。 当您看到整个示例时,这将变得清晰。首先,我们需要一个Observable ,每当我们想要开始缓冲(分组)上游事件时,它就会发出任何值。这个Observable确实可以推送任何类型的值,但这是无关紧要的,因为只有时机最重要。我们这里所返回的java.time包的Duration对象只是一个巧合,RxJava不以任何方式使用这个值:

Observable<Duration> insideBusinessHours = Observable
    .interval(1, SECONDS)
    .filter(x -> isBusinessHour())
    .map(x -> Duration.ofMillis(100));
Observable<Duration> outsideBusinessHours = Observable
    .interval(5, SECONDS)
    .filter(x -> !isBusinessHour())
    .map(x -> Duration.ofMillis(200));
Observable<Duration> openings = Observable.merge(
    insideBusinessHours, outsideBusinessHours);

首先使用interval()操作符,我们每秒生成一个计时器,但排除那些不在营业时间内的。这样,在9点到17点之间,我们会得到一个稳定的时钟滴答作响。回想一下interval()返回的是自然的Long类型的数字;但是,我们不需要它们,所以为了方便起见,我们用固定的100毫秒来替换它们。对称的代码是在17:00到9:00之间每5秒创建一个稳定的事件流。如果您好奇isBusinessHour()是如何实现,他是用的是java.time包来实现的,实现如下:

private static final LocalTime BUSINESS_START = LocalTime.of(9, 0);
private static final LocalTime BUSINESS_END = LocalTime.of(17, 0);
private boolean isBusinessHour() {
    ZoneId zone = ZoneId.of("Europe/Warsaw");
    ZonedDateTime zdt = ZonedDateTime.now(zone);
    LocalTime localTime = zdt.toLocalTime();
    return !localTime.isBefore(BUSINESS_START)
        && !localTime.isAfter(BUSINESS_END);
}

openings流将insideBusinessHours 流和outsideBusinessHours 流merge在一起。它基本上是一个触发器,指示buffer()操作符开始从上游收集样本,而不是丢弃它们。无论从opening流中发射出什么值,都是完全不相干的。但是,我们还必须指定何时停止聚合(缓冲)事件,并将它们作为一个批处理放到一个列表中。最明显的解决方案是将从opening流发出的每个事件视为停止当前批处理的信号,并将其释放到下游,并开始另一个批处理:

Observable<TeleData> upstream = //...
Observable<List<TeleData>> samples = upstream
    .buffer(openings);

注意我们如何将精心设计的opening流传递给buffer()操作符。前面的代码示例将事件类型是TeleData的upstream数据源切片。openings 的时钟批处理来自upstream的事件。在营业时间内,每隔一秒就会创建一个新的批处理,在业务时间之外,则是批处理5秒钟内的组值。重要的是,在这个版本中,来自upstream的所有事件都被保留了,因为它们要么是一个批次,要么是另一批。但是,重载的buffer()操作符也允许标记批处理的结束:

Observable<List<TeleData>> samples = upstream
    .buffer(
        openings,
        duration -> empty()
            .delay(duration.toMillis(), MILLISECONDS));

首先要记住,opening是一个 Observable<Duration>,但是从opening中发射的事件的实际是什么值并不重要。RxJava仅仅使用这个事件来开始缓冲TeleData 实例。第二个参数是Observable,当我们想要停止采样时,它必须完成。这第二条流的完成标志着一个给定批次的结束。仔细观察:每次我们想要开始新批时,opening流都会发出一个事件。对于从opening发出的每一个事件,我们返回一个新的Observable,它应该在将来的某个时刻完成/结束。因此,例如,当opening流发射了一个值为 Duation.ofMillis(100)的事件时,我们将其转换为一个Observable,给定的批处理会在100毫秒之后结束时(因为我们代码就是这么写的)。注意,在这种情况下,一些事件可能会在连续的批处理中被删除或复制。我们来看第二个参数,即这个Observable,它负责标记给定的批处理的结束,直到在下一次批处理的开始事件之前,在这个时间间隔出现的事件被buffer()丢弃。这是我们的情况:我们每隔1秒就开始缓冲事件(或每隔5秒的业务时间,根据营业事件),但是缓冲区关闭,并在100毫秒(具体时间要看duration变量的大小)后被转发。而大多数事件在缓冲期间之间发生,因此被丢弃。

PS:我们在本节之前说过需求,即在营业事件段,做100ms的快照,并每秒触发一次。因此这里做的这些工作就是为了满足这个需求,首先opening流式一个interval的规则触发流,可以每秒发射一个事件(或者每5秒发射一个),事件的值我们通过map操作符映射成了Duration,而这个对象,不仅是我们的发射值,而且还是upstream流的缓冲开启点。然后我们通过第二个参数指定了缓冲结束点,即delay一个Duration的值大小的延迟,然后该Observable结束、完成,从而upstream流也结束缓冲,并将缓冲区的值全部作为批次发送给下游

buffer()操作符非常灵活且非常复杂。确保你对它做了一点练习,并理解前面的例子。它用于从上游源中巧妙地批处理事件,以实现分组、抽样或移动窗口功能。但是,由于buffer()需要在当前缓冲区关闭并传递到下游之前创建一个中间List,所以它可能对垃圾收集和内存使用造成不必要的压力(请参见在第315页的 “内存消耗和泄漏”)。因此,引入了window()操作符。

results matching ""

    No results matching ""