在过去的几章中,我们观察了事件是如何从源Observable传播到下游的,通过一系列的操作符,向下传播到一Subscriber。在我们调用subscribe()(在某种意义上是传播)的时候,所有事件和通知都在向下移动,没有任何明显的反馈循环。这种缺乏反馈的情况会导致生产者(最主要的Observable )产生大量的事件-------压倒订阅者。因此,您的应用程序可能会在OutOfMemoryError错误中崩溃,或者充其量只能是潜伏的。

Backpressure(背压)是一种机制,它允许终端订阅者以及所有中间操作符只请求来自生产者的特定数量的事件。默认情况下,上游的Cold类型的Observable会以最快的速度产生事件。但是,如果有来自下游的请求,它应该在某种程度上“放慢速度”,并准确地产生所请求的数字。这就是为什么使用了 observeOn()操作符的情况下,会出现128这个神奇额数字的原因。但是,首先让我们看看终端订阅者如何控制反压力。

当订阅时,我们有可能实现onNext()、onCompleted()和onError()(请参阅第30页上“Subscribing to Notifications from Observable”小节)。原来有另一种回调方法可以实现::onStart():

Observable
    .range(1, 10)
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onStart() {
            request(3);
        }
        //onNext, onCompleted, onError follows...
});

onStart()是当你认为它应该的时候(在任何事件或通知被传播到Subscriber之前),RxJava就会调用它。您可以在技术上使用Subscriber 的构造函数,但是对于Java中的匿名内部类,构造函数看起来非常怪异:

.subscribe(new Subscriber<Integer>() {
    {{
        request(3);
    }}
    //onNext, onCompleted, onError follows...
});

但是我们跑题了。在Subscriber内部的request(3)调用指示上游源,我们首先愿意接收多少个条目。完全跳过此调用(或调用request(Long.MAX_VALUE))就相当于请求尽可能多的事件。这就是为什么我们必须非常早地调用request()的原因;否则,流就开始发出事件,我们不能在以后减少需求量。但是当我们只请求3个事件时,range()操作符将会在推送1、2和3之后会乖乖地停止释放事件。我们的onNext()回调方法将被调用三次,并且不再调用,尽管range()操作符尚未完成。但是,作为一个Subscriber ,我们可以完全控制我们想要接收多少数据。例如,我们可能需要单独请求项条目:

Observable
    .range(1, 10)
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onStart() {
            request(1);
        }
        @Override
        public void onNext(Integer integer) {
            request(1);
            log.info("Next {}", integer);
        }
        //onCompleted, onError...
});

这个例子有点傻,因为它的行为就像普通的Subscriber 一样,没有任何的背压。但它说明了如何使用背压。您可以想象一个Subscriber ,它预先缓冲了一些事件,然后在发现它方便的时候请求事件块。订阅者可能会决定在接收到更多的事件之前稍等片刻,尽管会空闲的,例如,减少对某些下游依赖的压力。在我们餐馆的例子中,服务员就是一个 Observable<Dish>,不断推送来新的脏盘子,而request(N)是厨房工作人员准备清洗一定数量的盘子。一个好的服务员不应该在没有厨房工作人员要求的情况下提供新的盘子。

也就是说,在客户端代码中直接调用request(N)是很少见的。更常见的情况是,我们在source和最终订阅者之间放置的各种操作符利用了背压来控制流经管道的数据量。例如,observeOn()必须订阅上游Observable,并将它接收到的每个事件调度到特定的Scheduler,例如Scheduler.io()。但如果上游以这样的速度生产事件,底层的Scheduler和Subscriber无法跟上,该怎么办呢?由observeOn()操作符创建的Subscriber是开启了背压的。首先,它只请求128个值。可以理解背压的上游Observable系统只发出给定数量的事件并保持空闲 -----这就是range()所做的事情。当observeOn()发现这批事件被下游的订阅者成功处理时,它请求更多。这样,尽管跨越了一个线程边界和生产者和消费者一方的异步性质,但是起码保证了消费者不会被事件淹没。

observeOn()不是唯一一个对背压友好的操作符。事实上,许多其他的操作符都在利用背压。例如,zip()只会缓冲一定数量的底层Observable发出的事件。假设只有一个压缩流是非常活跃的,这个zip()就不会受到影响。同样的逻辑也适用于我们使用的大多数操作符。

results matching ""

    No results matching ""