publish()操作符的另一个有用的用例是在没有订阅用户的情况下强制订阅。假设我们有Observable<Status> 。在向客户公开它之前,我们希望将每个事件存储在数据库中,而不考虑是否有人订阅了它。单纯的方法是不够的:
Observable<Status> tweets = //...
return tweets
.doOnNext(this::saveStatus);
我们正在使用doOnNext()操作符,它会偷看每个流经数据流的条目并执行一些操作,比如saveStatus()。然而,记住,Observables是惰性设计的;因此,只要没有人订阅,就不会触发doOnNext()。我们想要的是一个虚假的观察者,它并不真正地倾听事件,而是迫使上游的Observables产生事件。实际上, subscribe()的重载版本正是这样做的:
Observable<Status> tweets = //...
tweets
.doOnNext(this::saveStatus)
.subscribe();
最后这个空的Subscriber会去调用observable . create(),并连接到事件的上游源。这似乎解决了这个问题,但是我们又忘记了保护自己不受多个订阅者的影响。如果我们在外部公开tweet,第二个订阅者将会第二次尝试连接到外部资源,例如打开第二个HTTP连接。通常的解决方案是使用publish().connect()二重奏,在只保留一个上游Subscriber(订阅者)的同时,立即创建一个虚拟的Subscriber(订阅者)。这是最好的例子。最后,我们将学习如何单独使用publish():
ConnectableObservable<Status> published = tweets.publish();
published.connect();
最后,我们看到ConnectableObservable在它的全部荣耀。我们可以在任何Observable上调用Observable . publish(),并得到ConnectableObservable的返回值。我们可以继续使用原始的上游Observable(前面示例中的tweet):publish()不会影响它。但是我们将重点关注返回的ConnectableObservable。任何订阅到ConnectableObservable 的观察者都将被放到一个Subscribers集合中去。只要connect()不被调用,这些订阅者就被暂停,他们永远不会直接订阅上游Observable。但是,当connect()被调用时,一个专用的中介 Subscriber(订阅者)会订阅上游Observable(即样例中的tweets),不管有多少下游订阅者出现在前面——即使没有也没关系。但是,如果ConnectableObservable 中有一些Subscriber(订阅者)被暂停的话,他们将会收到相同的通知序列。
这种机制有很多优点。假设您在应用程序中有一个Observable ,并且多个Subscriber (订阅者)都对此感兴趣。在启动时,有几个组件(例如Spring bean或ejb)订阅这Observable 并开始监听。如果没有ConnectableObservable ,那么很有可能的是,hot类型的Observable将会开始发射将被第一个订阅者消费的事件,但是Subscribers(订阅者)如果在稍后启动的话,则会错过早期的事件。如果您希望确保所有订阅者都能获得一致的世界观,那么这将是一个问题。它们所有都将以相同的顺序接收事件,不幸的是,出现的晚的Subscriber将丢失早期的通知。
这个问题的解决方案是publish()这样一个Observable,并使系统中的所有组件都可以subscribe()到它;例如,在应用程序启动期间。当您100%确定需要接收相同事件序列(包括初始事件)的所有 Subscribers(订阅者)都有机会subscribe()时,通过connect()来连接到这样的Observable。这将在Observable上游创建一个Subscriber (订阅者),并开始将事件推给所有下游Subscriber (订阅者)。下面的示例使用Spring框架,但实际上它是框架无关的。
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import rx.Observable;
import rx.observables.ConnectableObservable;
@Configuration
class Config implements ApplicationListener<ContextRefreshedEvent> {
private final ConnectableObservable<Status> observable = Observable.<Status>create(subscriber -> {
log.info("Starting");
//...
}).publish();
@Bean
public Observable<Status> observable() {
return observable;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("Connecting");
observable.connect();
}
}
@Component
class Foo {
@Autowired
public Foo(Observable<Status> tweets) {
tweets.subscribe(status -> {
log.info(status.getText());
});
log.info("Subscribed");
}
}
@Component
class Bar {
@Autowired
public Bar(Observable<Status> tweets) {
tweets.subscribe(status -> {
log.info(status.getText());
});
l og.info("Subscribed");
}
}
我们的简单应用程序首先创建一个Observable(ConnectableObservable子类)。该Observable被设计为惰性的(以create创建的),所以即使静态地创建它们也是可以的。这个Observable是publish()了的,所有的订阅者都被暂停了,直到我们调用connect()时才会开始接收任何通知。然后,我们会发现有两个@ component注解标识的组件,它们通过@Autowired注解标识需要注入这个Observable。依赖注入框架提会为我们提供的ConnectableObservable,并允许每个组件去订阅它。然而,即使是Hot类型的Observable情况下,事件也只会直到应用程序完全启动才会到达。当所有的组件都被实例化并连接在一起时,从框架发出的ContextRefreshedEvent 就可以被消费了。在这一点上,我们可以保证所有组件都有机会请求给定的Observable,并发起subscribe()。当应用程序即将启动时,我们调用connect()。这只会向底层的Observable发起一次(且至多一次)订阅,并将完全相同的事件序列转发给每个组件。修剪后的日志输出可能看起来如下(组件名称位于方括号中):
[Foo ] Subscribed
[Bar ] Subscribed
[Config] Connecting
[Config] Starting
[Foo ] Msg 1
[Bar ] Msg 1
[Foo ] Msg 2
[Bar ] Msg 2
注意Foo和Bar组件如何报告它们已经订阅了,尽管它们还没有收到任何事件。只有在应用程序完全启动后,connect()订阅了底层Observable,并开始将Msg 1和Msg 2转发到所有组件。让我们对比一下在同一场景中可以看到的一个简单的可见,在这个场景中,ConnectableObservable没有被使用,我们允许每个组件立即订阅:
[Config] Starting
[Foo ] Subscribed
[Foo ] Msg 1
[Config] Starting
[Bar ] Subscribed
[Foo ] Msg 2
[Bar ] Msg 2
你需要知道两个不同之处。首先,当Foo组件订阅时,它立即启动与底层资源的连接;它不等待应用程序启动。更糟糕的是,Bar组件会启动另一个连接(注意,连接操作发生两次)。其次,您是否看到Bar组件是从Msg 2开始的,并且从未真正得到一个由Foo接收的Msg 1?在某些情况下,消费Hot类型的Observable 有可能会出现这种问题,你一定要小心。