只有当所有用户离开时,手动跟踪所有订阅者并关闭与外部系统的连接,这是我们将会执行的一项Sisyphean任务,这么做的目的仅仅是为了欣赏稍后的习惯性的解决方案。这个想法是为了跟踪所有处于 Set<Subscriber<Status>>集合中的的订阅者,当它变为空/非空时,启动/关闭外部系统连接:
//DON'T DO THIS, very brittle and error prone
class LazyTwitterObservable {
private final Set<Subscriber<? super Status>> subscribers =
new CopyOnWriteArraySet<>();
private final TwitterStream twitterStream;
public LazyTwitterObservable() {
this.twitterStream = new TwitterStreamFactory().getInstance();
this.twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
subscribers.forEach(s -> s.onNext(status));
}
@Override
public void onException(Exception ex) {
subscribers.forEach(s -> s.onError(ex));
}
//other callbacks
});
}
private final Observable<Status> observable = Observable.create(
subscriber -> {
register(subscriber);
subscriber.add(Subscriptions.create(() ->
this.deregister(subscriber)));
}
);
Observable<Status> observe() {
return observable;
}
private synchronized void register(Subscriber<? super Status> subscriber) {
if (subscribers.isEmpty()) {
subscribers.add(subscriber);
twitterStream.sample();
} else {
subscribers.add(subscriber);
}
}
private synchronized void deregister(Subscriber<? super Status> subscriber) {
subscribers.remove(subscriber);
if (subscribers.isEmpty()) {
twitterStream.shutdown();
}
}
}
subscribers这个集合线程安全的存储当前订阅的观察者。每次出现新订阅者时,我们将其添加到一个集合中,惰性地连接到事件的底层源上。相反,当最后一个订阅者消失时,我们关闭了上游的数据源。这里的关键是始终只有一个到上游系统的连接,而不是每个订阅者都有一个连接。这是可行的,而且是相当健壮的,但是,这个实现看起来太低级而且容易出错。访问subscribers集合必须是同步的,但是集合本身也必须支持安全迭代(因为TwitterStream 注册的监听器需要遍历该集合,而取消订阅的操作则需要对集合进行修改)。调用register()必须在添加deregister()回调之前出现;否则,那么在我们注册之前就可以调用后者,这很明显是不合乎常理的。必须有一种更好的方法来实现这样一种常见场景:将单个上游数据源多路复用到多个观察者——幸运的是,至少有两个这样的机制。RxJava主要是减少这种危险的样板文件,并抽象出并发性。