在我们自己实现的range()中,我们已经遇到了MissingBackpressureException。它到底是什么意思,你如何解释这个异常?想象一个Subscriber(您的订阅者,但更多时候是由某个操作符创建的),它确切地知道它想要接收多少条目;例如,buffer(N)或take(N)。这种操作符的另一个例子是observeOn()。它在这方面必须非常严格,如果上游Observable 因为某些原因,向下游推送了太多的事件,observeOn()内部的内部缓冲区就会溢出,并以MissingBackpressureException 暗示。但是为什么上游Observable推送了比请求的还要多的事件呢?因为它完全忽略了request()调用。让我们重新回到简单的range()实现:

Observable<Integer> myRange(int from, int count) {
    return Observable.create(subscriber -> {
        int i = from;
        while (i < from + count) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            } else {
                return;
            }
        }
        subscriber.onCompleted();
    });
}

阻止它的唯一方法是取消订阅,但是我们不想取消订阅,只是想稍微放慢下速度。下游操作符精确地知道他们想要接收多少事件,但是我们的源忽略了这些请求。满足下游请求的事件数量的底层机制,是通过rx.Producer来做到的。该接口在create()中插入。简要概括,OnSubscribeRange是每次有人订阅这个Observable的时候都会执行的回调。通常情况下,您会在这个接口中直接看到调用onNext(),但在考虑背压情况下不是这样的:

Observable<Integer> myRangeWithBackpressure(int from, int count) {
    return Observable.create(new OnSubscribeRange(from, count));
}
class OnSubscribeRange implements Observable.OnSubscribe<Integer> {
    //constructor...
    @Override
    public void call(final Subscriber<? super Integer> child) {
        child.setProducer(new RangeProducer(child, start, end));
    }
    }
class RangeProducer implements Producer {
    @Override
    public void request(long n) {
        //calling onNext() on child subscriber here
    }
}

这是您将在RxJava实现range()的源码中发现的代码骨架。实现Producer 是一项非常具有挑战性的任务:它必须有状态、线程安全,而且非常快的。因此,我们通常不会自己实现Producer ,但是了解它们的工作原理是有用的(参见第237页“Honoring the Requested Amount of Data”了解如何自己实现背压)。背压在内部将Rx原理颠倒。range()(以及许多其他内置的操作符)所产生的Observable不再将数据急切地推送到Subscriber 上,而是唤醒并对数据请求(即在订阅者内部调用的request(N))做出反应,然后才产生事件。此外,它确保不产生超出要求的事件。

看看我们如何在子Subscriber上设置一个Producer——这个Producer稍后将在Subscriber调用request()时间接地被调用。这就是我们如何从Subscriber建立一个反馈通道到源Observable。Observable指示其Subscriber如何请求一定数量的数据。Observable开关从push到pull-push模型,客户可以选择只请求有限数量的事件。那么,如果一些外部的Observable没有设置这样的通道该怎么办呢?当RxJava发现,年代处理一个不支持反压力的数据源的话,它可以在任何时候通过MissingBackpressureException宣布失败。然而,有有来自onBackpressure *()家族的操作符可以在一定程度上模拟背压。

最简单的onBackpressureBuffer()操作符会无条件地缓冲所有上游事件,只向下游订户提供所需的数据量:

myRange(1, 1_000_000_000)
    .map(Dish::new)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .subscribe(x -> {
        System.out.println("Washing: " + x);
        sleepMillis(50);
});

与往常一样,从底部到顶部阅读:第一个subscribe()传播到observeOn()操作符。observeOn()也必须订阅,但它不能简单地开始消费任意数量的事件。因此,它只在开始时请求一个固定的数字(128)的事件,以避免io() Scheduler 的队列溢出。onBackpressureBuffer()操作符作为一个防止数据源忽略背压的守卫。当它从下游Subscriber 接收到request(128)时,它会将请求传递给它,如果只有128个流过它,它就不会执行任何操作。但是,如果Observable只是忽略了那个请求,并简单地推动数据,不考虑背压,onBackpressureBuffer()在内部保持一个无界缓冲区。当另一个请求发自下游Subscriber 时,onBackpressureBuffer()首先耗尽它的内部缓冲区,并且只有当它几乎为空时,它才会向上游请求更多。这个聪明的机制允许observeOn()允许myRange()像是有背压一样的工作,而实际上它是onBackpressureBuffer()做的节流。不幸的是,无限的内部缓冲不是你可以轻松对待的东西。

Created: 1
Created: 2
Created: 3
Created: 4
Created: 8
Created: 9
Washing: 1
Created: 10
Created: 11
...
Created: 26976
Created: 26977
Washing: 15
Exception in thread "main" java.lang.OutOfMemoryError: ...
Washing: 16
at java.util.concurrent.ConcurrentLinkedQueue.offer...
at rx.internal.operators.OperatorOnBackpressureBuffer...

当然,您所能达到的内存溢出值可能会有所不同,尤其是有较小的事件和足够的内存的情况下,onBackpressureBuffer()可以在技术上工作起来。但在现实中,你永远不应该依赖无界的资源。无论是记忆还是你的固态硬盘。幸运的是,有一个重载版本的onBackpressureBuffer(N),它接受最大缓冲区大小:

.onBackpressureBuffer(1000, () -> log.warn("Buffer full"))

第二个参数是可选的;它是当1,000个元素的有界缓冲区满时的回调函数----尽管缓冲用户仍然无法以令人满意的速度处理事件。它不允许任何复苏,所以预计MissingBackpressureException警告信息会很快出现。但是我们至少可以控制缓冲区了,但不能控制硬件或操作系统的限制。

onBackpressureBuffer()的另一种替代方法是onBackpressureDrop(),它简单地丢弃了没有事先request()的事件。想象一下,一个餐馆里的服务生不停地往厨房里送盘子。onBackpressureBuffer()是一个有限/无限的桌子,上面是有待清洗的碗碟。另一方面,对于onBackpressureDrop(),如果现在没有洗盘子的能力,服务员会把脏盘子扔掉。这不是一个非常可持续的商业模式,至少餐厅可以继续为客户服务:

.onBackpressureDrop(dish -> log.warn("Throw away {}", dish))

这个回调是可选的,它会在每次事件必须被丢弃时通知我们,因为它在没有被请求的情况下出现的。跟踪我们扔掉了多少事件是一个好主意;这可能是一个重要的指标。最后,有一个与onBackpressureDrop()非常类似的onBackpressureLatest(),但是,保留对最后一个删除的元素的引用,以便在下游request()时,提供上游最后看到的值。

onBackpressure *()方法是用来在操作符和订阅者之间架起一座桥梁,需求背压,但是源Observables不支持它。然而,最好是使用或创建支持背压的数据源。

results matching ""

    No results matching ""