Subject类非常有趣,因为它继承自Observable,同时实现了Observer,但是它是一个抽象类。这意味着你不仅可以以客户端(订阅上游事件)的角度来将其视为Observable,而且还可以以提供端(通过调用onNext()来推动下游的事件)的角度来将其视为订阅者。通常,您所做的是在内部保持对Subject实例的引用,这样您就可以将事件从您喜欢的任何数据源中推送出来,对外公开是以Observable类型来公开Subject的。让我们使用Subject 重新实现流状态更新。为了进一步简化实现,我们急切地连接到外部系统,而不是通过保持对subscribers的追踪来决定是否打开/断开连接。除了简化我们的示例之外,当第一个订阅者出现时,这也有利于较小的延迟(毕竟是在初始化时候即连接到外系统,而懒加载只是将连接推迟到了第一个订阅者出现,这会适当引进一部分延迟)。事件已经在流动,我们不需要等待重新连接到第三方应用程序:
class TwitterSubject {
private final PublishSubject<Status> subject = PublishSubject.create();
public TwitterSubject() {
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
subject.onNext(status);
}
@Override
public void onException(Exception ex) {
subject.onError(ex);
}
//other callbacks
});
twitterStream.sample();
}
public Observable<Status> observe() {
return subject;
}
}
PublishSubject是Subject 的子类。我们急切地开始接收上游系统的事件,并简单地将它们(通过调用Subject.onnext(…))推送给所有订阅者。注意我们如何简单地在observe()中返回Subject,假装它是一个简单的Observable 。现在当有人订阅时,Subscriber将在onNext()后立即接收所有后续事件-----至少在它取消订阅之前会被调用。因为Subject 在内部管理着Subscribers的生命周期,所以我们只需调用onNext(),而不用担心有多少个订阅者正在监听着
Error Propagation in Subjects
Subjects是有用的,但有许多微妙之处你必须明白。例如,在调用Subject . onError()之后,该Subject将静默地删除随后的onError通知。
Observable.create(...)看起来很复杂,不方便有效的管理,Subject是创建Observable实例的有用工具。其他类型的Subject如下:
- AsyncSubject:记住上次发射出的值,并在调用onComplete()时将其推送给订阅者。只要AsyncSubject还没有完成,除了最后一个事件,其他的都丢弃。以官文的例子解释:
- 下面的例子,观察者是不会获取到任何的事件的,因为subject并没有completion:
AsyncSubject<Object> subject = AsyncSubject.create(); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three");
- 下面的例子,观察者实惠获取到“three”这条消息:
AsyncSubject<Object> subject = AsyncSubject.create(); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onCompleted();
- 下面的例子,观察者是不会获取到任何的事件的,因为subject并没有completion:
- BehaviorSubject:在订阅事件发生后推送所有事件,就像PublishSubject一样。但是,首先它会发射在订阅之前最近发生的事件。这允许订阅者立即被告知流的状态。例如,Subject可以代表每分钟的当前温度。当客户订阅时,他将立即收到最后一次看到的温度,而不是等待几秒钟之后的下一个事件。但同一用户对历史的温度并不感兴趣,只对离订阅时间最近的最后一个温度感兴趣。即如果没有发出事件,则首先推送一个特殊的默认事件(如果提供了的话)。
- 译者注:其实这个Subject很简单,即在你订阅的同时,首先获取距离订阅时间往前的最近发生的事件(虽然可能这已经是一段时间之前发射的事件了,但是由于新的事件还没有生成,所以这就是新事件),如果没有的话,则获取default事件
ReplaySubject:它是Subject中最有趣的一个,它会缓存缓贯穿整个历史的事件。如果有人订阅,首先他会接收到一批错过的(即之前缓存了的)事件和之后的实时事件。默认情况下,自创建该Subject以来的所有事件都将被缓存。如果流是无限的或非常长(参见第315页的“内存消耗和泄漏”),就会变得危险。在这种情况下,重载版本的ReplaySubject只保留以下内容:
内存中事件的可配置数量(createWithSize())
最近事件的可配置时间窗口( createWithTime())
或者甚至是限制大小和时间(看先到达那个极限),通过createWithTimeAndSize()
Subjects 应该谨慎对待:通常共享订阅和缓存事件有更多的惯用方法——例如,可以参考“ConnectableObservable”。就目前而言,选择相对于低级的,比如 Observable.create(),甚至是更胜一筹的,例如我们之前学习的工厂方法:from()和just()
还有一件事要记住,那就是并发性。默认情况下,在Subject中调用onNext()直接传播到所有观察者的onNext()回调方法上。这些方法共享相同的名称并不奇怪。在某种程度上,调用Subject 的onNext()间接调用每个订阅者的onNext()。但是,您需要记住,根据Rx设计准则,所有对Observer的调用都必须串行化(即是连续的)因此,两个线程不能同时调用onNext()。然而,取决于你鼓励的Subject的方式,你可以很容易地打破这个规则 -----比如,从线程池中的多个线程中调用subject . onnext()。幸运的是,如果您担心这种情况,只需在Subject上调用toSerialized(),它与调用Observable. serialize()非常相似。该操作符确保下游事件以正确的顺序发生。