一个Observable自然有很多订阅者。就像发布-订阅模式一样,单个发布者可以将事件分派给多个消费者(即订阅者)。在RxJava中,Observable<T>只是一种类型化的数据结构,它可以非常短暂地存在,也可以在内存中存在许多天,只要服务器应用程序运行着。同样的故事也适用于订阅者。你可以订阅一个Observable ,消费一些事件,然后抛弃所有其他的事件。或者恰恰相反:只要Observable存在就会持续的消费push的事件,这可能会持续数小时或持续数天以上。
假设有一个Observer(观察者)预先知道它想接收多少个条目或者什么时候停止接收它们。例如,我们订阅了股票价格的变动,但是 当价格降到1美元以下时,我们不再愿意监听。显然,正如Observer(观察者)有能力订阅一样,它也应该能够在发现合适的时候取消订阅。有两种手段来支持这种需求:Subscription和Subscriber。我们来谈谈前者。我们还没有探究 subscribe()方法实际上返回了什么:
Subscription subscription =
tweets.subscribe(System.out::println);
//...
subscription.unsubscribe();
Subscription 是一个句柄,允许客户端代码通过使用unsubscribe()方法来取消订阅。此外,还可以使用isUnsubscribed()方法查询订阅的状态。当你不再想要接收更多的事件时,立即取消订阅是很重要的。这样可以避免内存泄漏和系统不必要的负载。有时,我们订阅了一个Observable ,并完全消费它所推送的事件,永远不会真正取消订阅,即使这条流是无限的。然而,在某些情况下,订阅用户来来去去,而Observable 则会永远产生事件。
还有一种方法可以请求取消订阅,这一次来自于侦听器内部。我们知道,我们可以使用Subscription 在Observer或者回调之外来控制订阅。另一方面,Subscriber<T>实现了Observer<T>和Subscription 。因此,它可以同时消费推送来的通知(事件、Completion或Error)和控制订阅。下面的代码示例订阅了所有事件,但是订阅者本身决定在一定的条件下放弃接收通知。通常,这可以通过使用内置的takeUntil()操作符来完成,但目前我们可以手动取消订阅:
Subscriber<Tweet> subscriber = new Subscriber<Tweet>() {
@Override
public void onNext(Tweet tweet) {
if (tweet.getText().contains("Java")) {
unsubscribe();
}
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
tweets.subscribe(subscriber);
当用户决定不再接收更多项目时,它可以取消订阅。作为一个练习,您可以实现一个只接收n个事件的Subscriber ,然后取消订阅。Subscriber 类的功能比这更强大,但就目前而言,它可以自己取消对Observable 的订阅。