即使Reactive Streams用与技术无关的方法解决一个非常普遍的问题,我们也将关注RxJava以及它如何处理背压的问题。 在这一章中,我们将使用一个在小餐馆里不断洗盘子的例子。盘子被建模为带有标识符的大对象。
class Dish {
private final byte[] oneKb = new byte[1_024];
private final int id;
Dish(int id) {
this.id = id;
System.out.println("Created: " + id);
}
public String toString() {
return String.valueOf(id);
}
}
oneKb缓冲区模拟了一些额外的内存使用。Dish由服务员传递到厨房,并被视为Observable:
Observable<Dish> dishes = Observable
.range(1, 1_000_000_000)
.map(Dish::new);
range()运算符将以最快的速度生成新值。那么,如果洗碗需要一点时间,而且明显比生产的速度慢,会发生什么情况呢?
Observable
.range(1, 1_000_000_000)
.map(Dish::new)
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
});
令人惊讶的是没有什么不好。如果你研究输出,你会注意到range()与订阅完全一致:
Created: 1
Washing: 1
Created: 2
Washing: 2
Created: 3
Washing: 3
...
Created: 110
Washing: 110
...
这对你来说不应该感到惊讶。默认情况下,range()操作符不是异步的,因此它所生成的每个条目都将在同一个线程的上下文中直接传递给订阅者。如果订阅方很慢,它有效地阻止了可观察的内容产生更多的元素。range()不能调用订阅者的onNext(),直到前一个订阅者完成。这是可能的,因为生产者和消费者都在同一个线程中工作,并且是透明耦合的。在某种意义上,它们之间有一个最大容量为1的隐式队列。我们没有预料到的rendezvous算法。想象一下,一个餐馆里的服务员,只要那些目前正在清洗的还没有完成,他就不能留下新盘子清洗。但是当一个服务员站在那里等着洗好的碗碟的时候,就不能伺候顾客了。而没有服务员服务的话,那么就不会有新的顾客光顾饭店。
这是一个阻塞组件如何把整个系统都拖延起来的。然而,在现实生活中,生产者和消费者之间通常有一个线程边界:Observable的线程在一个线程中产生事件,而Subscriber在另一个线程中消费事件:
dishes
.observeOn(Schedulers.io())
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
});
在没有编译和运行代码的情况下,昂我们停下来想一下这样做会发生什么。人们可能会认为应该发生灾难,因为从range()操作符中,dishes会产生非常快的事件,而订阅者的速度非常慢,每秒只消耗20个盘子。observeOn()操作符可以连续不断地消费事件,但是Subscriber消费事件的速度是缓慢的。因此,您可能会得出结论:OutOfMemoryError是不可避免的,因为未处理的事件堆积在某个地方。幸运的是,背压在这种情况下节省了时间,RxJava在一定程度上保护了我们。这个程序的输出有些出乎意料:
Created: 1
Created: 2
Created: 3
...
Created: 128
Washing: 1
Washing: 2
...
Washing: 128
Created: 129
...
Created: 223
Created: 224
Washing: 129
Washing: 130
...
首先,一批128个盘子是由range()产生的,几乎是瞬间产生的。后来,洗盘子的过程很慢,一个接一个。某种程度上,range()操作符变得空闲。当最后一个盘子(128号盘子)被洗出来时,另一批96个盘子由range()制作,然后是缓慢的洗涤过程。显然,必须有一些聪明的机制防止range()产生过多的事件,并由订阅者控制。如果您没有看到这种机制被部署到哪里,让我们尝试自己实现range():
Observable<Integer> myRange(int from, int count) {
return Observable.create(subscriber -> {
int i = from;
while (i < from + count) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
} else {
return;
}
}
subscriber.onCompleted();
});
}
这里,我们在相同的例子使用我们自己实现的myRange,并搭配着 observeOn() 一起使用:
myRange(1, 1_000_000_000)
.map(Dish::new)
.observeOn(Schedulers.io())
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
},
Throwable::printStackTrace
);
这一切以灾难结束,我们甚至没有洗过任何一个Dish:
Created: 1
Created: 2
Created: 3
...
Created: 7177
Created: 7178
rx.exceptions.MissingBackpressureException
at rx.internal.operators...
at rx.internal.operators...
MissingBackpressureException将稍后解释。就目前而言,我猜这使您确信这里有一些后台机制,而我们自定义的range()的实现是缺乏的。