让我们回顾一下:我们对底层资源有一个单独的句柄;例如,连接到Twitter状态更新的HTTP连接。然而,一个Observable推送这些将在多个订阅者之间共享的事件。因此,每个订阅者都开启自己的连接。这是非常浪费:

Observable<Status> observable = Observable.create(subscriber -> {
    System.out.println("Establishing connection");
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    //...
    subscriber.add(Subscriptions.create(() -> {
        System.out.println("Disconnecting");
        twitterStream.shutdown();
    }));
    twitterStream.sample();
});

当我们尝试使用这个可观察性时,每个订阅者建立一个新的连接, 就像这样:

Subscription sub1 = observable.subscribe();
System.out.println("Subscribed 1");
Subscription sub2 = observable.subscribe();
System.out.println("Subscribed 2");
sub1.unsubscribe();
System.out.println("Unsubscribed 1");
sub2.unsubscribe();
System.out.println("Unsubscribed 2");

下面是输出:

Establishing connection
Subscribed 1
Establishing connection
Subscribed 2
Disconnecting
Unsubscribed 1
Disconnecting
Unsubscribed 2

这一次,为了简化,我们使用了一个无参数 subscribe()方法,该方法触发订阅,但会删除所有事件和通知。在花费了几乎一半的章节与这个问题进行了斗争并熟悉了大量RxJava特性之后,我们终于可以介绍最可伸缩和最简单的解决方案:publish(). refcount()对:

lazy = observable.publish().refCount();
//...
System.out.println("Before subscribers");
Subscription sub1 = lazy.subscribe();
System.out.println("Subscribed 1");
Subscription sub2 = lazy.subscribe();
System.out.println("Subscribed 2");
sub1.unsubscribe();
System.out.println("Unsubscribed 1");
sub2.unsubscribe();
System.out.println("Unsubscribed 2");

这次的输出就与我们预想的一致了:

Before subscribers
Establishing connection
Subscribed 1
Subscribed 2
Unsubscribed 1
Disconnecting
Unsubscribed 2

直到我们实际获得第一个Subscriber (订阅者)时,才会建立连接。但是,更重要的是,第二个Subscriber (订阅者)不启动一个新的连接,它甚至不触及原始的Observable。publish().refCount()串联封装了底层的Observable并拦截了所有订阅。稍后我们将解释为什么我们需要两个方法以及单独使用publish()的方法。目前,我们将重点讨论refCount()。这个操作符所做的就是计算我们目前有多少活跃用户,就像历史垃圾收集算法中的引用计数一样。当这个数字从0变到1时,它订阅到上游的Observable。超过1的任何数字都将被忽略,而上游的Subscriber (订阅者)也只是在所有下游Subscriber (订阅者)之间共享。然而,当最后一个下游Subscriber (订阅者)取消订阅时,计数器从1下降到0,然后refCount()知道它必须立即取消订阅。值得庆幸的是,refCount()正是完成了我们用LazyTwitterObservable手工实现的功能。您可以使用publish(). refcount()二重唱来允许共享单个Subscriber (订阅者),同时保持惰性。这对操作符非常频繁地使用,因此有一个别名:share()。请记住,如果取消订阅后不久就订阅,share()仍然执行重新连接,就像根本没有缓存一样。

results matching ""

    No results matching ""