有一些操作符可以根据流的性质来消耗任意数量的内存。我们将只看其中的几个,并尝试采取一些安全措施以避免泄漏。
distinct() caching all seen events
例如,根据定义,distinct()必须在订阅之后存储所有遇到的key(即键).默认的重载distinct()的将所有看到的事件与内部缓存集进行比较。如果相同的事件(关于equals())没有出现在流中,那么它将被发射并添加到缓存中以备将来使用。该缓存从不会被驱逐,以保证同一事件不再出现。您可以很容易地想象,如果事件相当大或频繁,这个内部缓存将保持增长,导致内存泄漏。
为了演示的目的,我们将使用以下事件模拟大量数据::
class Picture {
private final byte[] blob = new byte[128 * 1024];
private final long tag;
Picture(long tag) { this.tag = tag; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Picture)) return false;
Picture picture = (Picture) o;
return tag == picture.tag;
}
@Override
public int hashCode() {
return (int) (tag ^ (tag >>> 32));
}
@Override
public String toString() {
return Long.toString(tag);
}
}
下面的程序是针对一个非常小的内存约束环境(- mx32M:即32 MB的堆)执行的,它会以最快的速度发送大型事件:
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.distinct()
.sample(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
在运行此操作之后,OutOfMemoryError出现得非常快,因为distinct()的内部缓存不能保存更多的图片实例。在崩溃之前不久的CPU使用也相当严重,因为垃圾收集器决定释放一些空间。但是,即使不是把整个Picture 作为区分事件的key(即键),我们只使用Picture .tag,程序仍然会崩溃,只是很久以后:
distinct(Picture::getTag)
这种类型的泄漏更加危险。这个问题在我们没有注意到的情况下慢慢地升级,直到在最意想不到的时刻爆发,通常是在高负荷之下。为了证明distinct()是内存泄漏的根源,使用一个不用distinct()操作符的类似程序来检验,只是计算每秒释放多少事件,而不需要缓冲。您的里程(即您的电脑所能达到的事件总数)可能会有所不同,但您可以预期每秒处理数十万条大型消息,而不会对垃圾收集或内存造成太大的压力:
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(System.out::println);
那么如何避免istinct()的内存泄漏呢?
避免一起使用distinct()。简单的说,这个操作符本身就很危险,尤其是使用不当的情况下。
明智地选择你的key(即我们上面所说的键)。理想情况下,它应该具有有限和小的值空间。Enum和字节是OK的,long或String可能不行。如果您不能证明给定的类型只会有非常有限的值(如Enum),那么您可能会导致内存泄漏。
考虑使用distinctUntilChanged()代替。它只跟踪最后一次看到的事件,而不是所有的事件
从一开始你真的需要唯一性吗?或者你可以放宽这个要求?也许你知道重复的东西只会在10秒钟内出现?那么就考虑在一个小的时间窗口上运行distinct():
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.window(10, TimeUnit.SECONDS)
.flatMap(Observable::distinct)
每隔10秒,我们就会启动一个新窗口(参考第214页上的“Buffering Events to a List”),并确保在该窗口中没有重复事件。window()操作符发出在每个时间窗口内发生的所有事件的Observable 。惟一的值(参考distinct()操作符的调用)在该窗口中立即发出。当10秒的窗口结束时,一个新的10秒窗口开始,但更重要的是,与旧窗口关联的缓存被垃圾收集。当然,在这10秒的窗口中,仍然可能会出现大量事件导致OutOfMemoryError,所以最好使用固定长度的窗口(e.g., window(1000) )而不是固定事件的窗口。而且,如果在一个窗口的末尾和下一个窗口的开头都出现了非独特的事件(即上一个窗口的尾巴,和新的窗口的开始,出现了相同的事件),我们将不会发现他们是重复的。这是一个你必须意识到的权衡。
Buffering events with toList() and buffer()
事实上,toList()可以消耗无限的内存是很明显的。此外,对于无限流使用toList()是没有意义的。toList()仅在上游源完成时发出一个条目——当没有完成时候,toList()将不会发出任何东西。但它将继续将所有事件聚集在内存中。使用toList()用于很长的流也是值得怀疑的。您应该找到一种方法来动态的使用这些事件,或者至少使用像take()这样的操作符来限制的上游事件的数量。
当您需要同时查看有限的Observable的所有事件时,toList()是有意义的。在很少的情况下,您可以应用谓词(如allMatch()和anyMatch())、条目计数(count()),或者将它们归纳为单个聚合值(reduce()),而不需要同时在内存中需要所有事件。一个用例可以将Observable<Observable<T>>转化为Observable<List<T>>,其中内部Observable已知是固定长度:
.window(100)
.flatMap(Observable::toList)
这相当于以下内容:
.buffer(100)
这就给我们带来了 buffer() 的效果。在使用 buffer() 之前,请仔细考虑一下是否需要一个包含某个期限内的所有事件的List<T>
。例如,可能一个可 Observable<T>就足够了,假设您需要知道在每一秒中是否有超过5个高度优先级的事件发生,并且有一个 Observable<Incident>。您想要产生一个Observable<Boolean>,如果在那一秒内发生了大量的高优先级事件,就发射true,其他情况则发射false.通过 buffer()的话,就很简单了:
Observable<Incident> incidents = //...
Observable<Boolean> danger = incidents
.buffer(1, TimeUnit.SECONDS)
.map((List<Incident> oneSecond) -> oneSecond
.stream()
.filter(Incident::isHIghPriority)
.count() > 5);
但是,window()不需要将事件缓冲到中间List中,而是在动态的中转发它们。window()对相同的任务同样方便,而且可以保持常量内存使用。
Observable<Boolean> danger = incidents
.window(1, TimeUnit.SECONDS)
.flatMap((Observable<Incident> oneSecond) ->
oneSecond
.filter(Incident::isHIghPriority)
.count()
.map(c -> (c > 5))
);
与JDK中的流相比,Observable际上拥有更丰富的API,因此您可能会发现自己将Java集合转换为Observable,只是为了更好的操作符。例如,流不支持滑动窗口或压缩。
也就是说,在可能的情况下,您应该选择window()而不是buffer(),特别是在bufer()中积累的内部列表的大小无法预测和管理的情况下。
Caching with cache() and ReplaySubject
cache()操作符是另一个明显的内存消耗者。比distinct()更糟糕的是,cache()保留了它从上游接收到的每一个事件的引用。ruguo Observable已知是固定且短的长度的话,使用ache()是有意义的。例如,当使用Observable来建模某个组件的异步响应时,使用cache()是安全且可取的。否则,每个Observer(观察者)将再次触发请求,可能会导致意外的副作用。相反,缓存长的,并且可能是无限的Observables,尤其是Hot类型的,没有什么意义。因为你可能对历史并不感兴趣。
Backpressure keeps memory usage low
还记得我们是如何将两个产生时间速度不一致的事件源压缩在一起的吗(参考83页)?如果您试图压缩这两个事件源,其中一个甚至比另一个更慢,zip()/ zipWith()操作符在等待较慢的事件时必须临时缓冲更快的流:
Observable<Picture> fast = Observable
.interval(10, MICROSECONDS)
.map(Picture::new);
Observable<Picture> slow = Observable
.interval(11, MICROSECONDS)
.map(Picture::new);
Observable
.zip(fast, slow, (f, s) -> f + " : " + s)
您可能预计到这段代码最终会以OutOfMemoryError崩溃,因为zip()会缓冲快流中的事件,等待慢流。但事实并非如此;事实上,我们几乎立即得到可怕的MissingBackpressureException。zip()(和zipWith())操作符不会盲目的以上游指令的任何吞吐量接收事件。相反,这些操作符利用了背压(参见第226页的“反压力”),并且只请求尽可能少的数据。因此,如果上游Observable是Cold类型的,并且是合理的实现的,zip()将仅仅通过请求较少的数据,而不是它在技术上可以生成的数据,来放慢快的Observable的速度。
但是,在interval()的情况下,这种机制不会这样工作。interval()操作符生成的Observable是Cold类型的,因为它只在有人订阅时才启动计数器,而每个Observer(观察者)都有它自己的独立流。然而,在我们已经订阅了interval()之后,就没有办法降低它的速度了,根据定义,它必须以一定的频率发出事件。因此,它必须忽略背压并可能导致MissingBackpressureException请求。我们所能做的就是去掉多余的事件(参见233页的“Producers and Missing Backpressure”):
Observable
.zip(
fast.onBackpressureDrop(),
slow.onBackpressureDrop(),
(f, s) -> f + " : " + s)
但是出现MissingBackpressureException,它比OutOfMemoryError好吗?好吧,缺失背压会很快就失败,而内存不足是慢慢的构建起来的,消耗了原本可以在其他地方分配的宝贵内存。但是,缺失背压也可能发生在最意想不到的时刻——例如,当垃圾回收发生时。第263页上的“Verifying emitted events”讨论了如何对背压行为进行单元测试。