无限数据结构是一个重要的概念。计算机内存是有限的,因此有一个无限的列表或流听起来是不可能的。但是RxJava允许您动态地产生和消费事件。传统的队列可以被视为一个无限值的源,尽管没有把它们全部留在内存中。也就是说,您将如何使用create()来实现这样一个无限的流呢?例如,让我们构建一个可以产生所有自然数的Observable :

//BROKEN! Don't do this
Observable<BigInteger> naturalNumbers = Observable.create(
    subscriber -> {
        BigInteger i = ZERO;
        while (true) { //don't do this!
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    }
);
naturalNumbers.subscribe(x -> log(x));

在任何代码库中while(true)的存在应该会触发一个警报铃。起初看起来还可以,但是您应该很快意识到这个实现已经被破坏了。但不是因为它是无限的——事实上,无限的Observable 是完全可以的,而且是非常有用的。当然,只要它们被正确地实现。当您subscribe()订阅时,create()中的lambda表达式是在当前线程的上下文中调用的。而且因为这个lambda永远不会结束,所以subscribe()块会无限地进行。但是,您可能会问,“订阅不应该是异步执行的吗?这不是要比在客户端线程中同步执行更好吗?”这是一个有效的问题,所以让我们花些时间来介绍并发性:

Observable<BigInteger> naturalNumbers = Observable.create(
    subscriber -> {
        Runnable r = () -> {
            BigInteger i = ZERO;
            while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
            }
        };
        new Thread(r).start();
});
log("starting")
naturalNumbers.subscribe(s -> log(s))

下面放译者的一点测试log:

main: starting
Thread-0: 0
Thread-0: 1
Thread-0: 2
Thread-0: 3
Thread-0: 4
Thread-0: 5
Thread-0: 6
Thread-0: 7
Thread-0: 8
Thread-0: 9

与其在客户端线程中直接运行阻塞循环,不如我们还生成一个自定义线程,并直接从那里发射事件。幸运的是, subscribe()不再阻塞客户端线程,因为它所做的所有工作只是在派生出一个线程。x -> log(x) 这个回调的所有调用都是从后面的自定义线程中执行的(看测试log,很明显的打印出了线程名字)。现在想象一下,我们对所有的自然数都不感兴趣(毕竟有太多的自然数),但仅仅是前几个我们比较感兴趣。而且现在我们已经知道如何停止接收Observable 通知——取消订阅:

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
//after some time...
subscription.unsubscribe();

如果你注意细节,你可能会注意到可疑的while(true)循环被如下所代替:

while (!subscriber.isUnsubscribed()) {

对于我们所做的每一次迭代,我们都需要确保有人确实在监听。当用户决定停止订阅,那么subscriber.isUnsubscribed()条件会告诉我们,所以我们可以安全地完成流并退出Runnable,有效地停止例了线程。显然,每个订阅者都有自己的线程和循环,因此当一个订阅者决定取消订阅时,其他人将继续接收他们独立的事件流。尽管创建自己的线程不是一个好的设计决策,而且RxJava有更好的声明性工具来处理并发性,但是前面的代码示例展示了如何正确处理订阅事件。

建议在subscriber(订阅者)不再希望接收新事件后,尽可能频繁地检查 isUnsubscribed()标志以避免发送事件。此外,当生产事件的成本高昂时,当没有人想要的时候却迫不及待地发送它们是毫无意义的。尽管在create()中生成自己的线程并没有什么本质上的错误,但是它很容易出错,而且很糟糕。在第140页的“RxJava多线程”中,我们探索了声明性并发和自定义调度程序,允许您编写并发代码,而不需要真正地与线程交互。

只要事件被相对频繁地推送,在发送事件之前立即处理取消订阅是可以的。但是想象一下一个事件很少出现的情况。Observable只能在其试图将某个事件推到订阅者时,会去确定是否取消订阅。

以下图中的这个工厂方法为例:delayed(x) 创建一个Observable ,在休眠10秒钟后发出值x。它类似于Observable 的just(x),但是delayed方法提供有额外的延迟功能。我们已经知道需要使用额外的线程,即使它不是最佳的使用模式:

static <T> Observable<T> delayed(T x) {
    return Observable.create(
        subscriber -> {
            Runnable r = () -> {
                sleep(10, SECONDS);
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(x);
                    subscriber.onCompleted();
                }
            };
            new Thread(r).start();
        });
}
static void sleep(int timeout, TimeUnit unit) {
    try {
        unit.sleep(timeout);
    } catch (InterruptedException ignored) {
        //intentionally ignored
    }
}

这个简单的实现生成了一个新线程,然后休眠10秒钟。一个更健壮的实现至少应该使用 java.util.concurrent.Schedule

dExecutorService。这是为了教育目的。10秒钟后,我们条件判断以确保有订阅者在监听,如果是这样的话,我们就会发出一个事件并完成。但是,如果订阅者在订阅之后一秒钟就决定取消订阅,而在该事件应该被发出之前,该怎么办呢?嗯,没什么。后台线程继续休眠剩余的9秒,然后才刚刚意识到订阅者已经过去了。这就是困扰我们的问题;将资源保持在额外的9秒看来是浪费。假设这是一个昂贵的实时数据服务连接,我们按秒支付着使用,但是却很少发生事件。在等待几秒甚至几分钟然后才刚刚意识到已经没有人订阅了,我们应该终止连接,这听起来是最不理想的。

幸运的是,有了subscriber实例,我们可以在它取消订阅后立即通知我们, 尽可能快地清理资源,而不是在下一个消息出现时:

static <T> Observable<T> delayed(T x) {
    return Observable.create(
        subscriber -> {
        Runnable r = () -> {/* ... */};
        final Thread thread = new Thread(r);
        thread.start();
        subscriber.add(Subscriptions.create(thread::interrupt));
    });
}

最后一行是至关重要的,但是其他的都是一样的。后台线程已经运行了——确切地说,是休眠10秒钟。但是,在生成一个线程之后,我们要求subscriber 如果它取消了订阅的话,则通过调用一个回调函数来通知我们,并通过Subscriber.add()注册。这个回调基本上只有一个目的:中断线程。interrupt()的作用是在sleep()中抛出一个InterruptedException(),过早地打断我们的10秒暂停。sleep()在吞下异常后优雅地退出。然而,此时subscriber.isUnsubscribed()返回false,没有发送事件。线程立即停止,没有资源被浪费。您可以使用相同的模式来执行任何清理工作。但是,如果您的流产生一个稳定的、频繁的事件流,那么您很可以不需要显式的回调,想之前那样,等待下一条事件到来时候,就会执行isUnsubscribed()检查,自然也就停止事件发射了 。

为什么不应该在create()中使用显式线程,还有另外一个原因。第4.2节的Rx设计准则中要求:假设以串行方式调用observer实例,即要求订阅者不能以并发方式接受通知。当涉及到显式线程时,很容易违反这一要求。这种行为与actor类似,例如在Akka工具包中,每个actor一次可以处理一个消息。这样的假设允许编写观察者,就好像它们是同步的,在大多数线程中都可以访问。尽管可以来自多个线程的事件,但这是正确的。Observable的自定义实现必须确保满足该契约。考虑到这一点,请查看下面的代码,该代码将对多个数据块进行并行加载:

Observable<Data> loadAll(Collection<Integer> ids) {
    return Observable.create(subscriber -> {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        AtomicInteger countDown = new AtomicInteger(ids.size());
        //DANGER, violates Rx contract. Don't do this!
        ids.forEach(id -> pool.submit(() -> {
            final Data data = load(id);
            subscriber.onNext(data);
            if (countDown.decrementAndGet() == 0) {
                pool.shutdownNow();
                subscriber.onCompleted();
            }
        }));
    });
}

这段代码,除了意外地非常复杂,还违反了一些Rx原则。也就是说,它允许同时从多个线程调用订阅者的onNext()方法。其次,您可以通过应用惯用的RxJava操作符(如merge()和flatMap()),来避免复杂性,但是我们将在77页的“Treating Several Observables as One Using merge()”中实现。好消息是,即使有人很糟糕地实现了Observable,我们也可以通过应用serialize()操作符来轻松地修复它,例如loadAll(…). serialize()。该操作符确保事件被串行化和排序。它还强制在Completion(完成)或Error(错误)之后不再发送事件。

创建Observables的最后一个我们还没涉及过的方面是错误传播。到目前为止,我们已经了解到,Observer<T> 可以接收T类型的值,对于Completion和Error事件是可选的。但是如何将Error推送到下游所有订阅者呢?在create()中将整个表达式以try - catch块中包装起来是一种很好的做法。Throwable 应该向下游进行传播,而不是被录或重新抛出,如下所示:

Observable<Data> rxLoad(int id) {
    return Observable.create(subscriber -> {
        try {
            subscriber.onNext(load(id));
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    });
}

这个额外的try- catch块对于传播可能抛出的异常是必要的,例如,load(id)。否则,RxJava将尽力至少打印出标准输出的异常,但是要构建有弹性的流,异常需要被视为一等公民,而不不只是语言中没有人真正理解的额外特性。

用一个返回值和一个try - catch语句完成组装一个Observable 的模式是非常普遍的,因此引入了一个新的内置操作符:fromCallable():

Observable<Data> rxLoad(int id) {
    return Observable.fromCallable(() ->
        load(id));
}

它在语义上是等价的,但是比create()更短,并且有一些其他的好处,您稍后会发现。

results matching ""

    No results matching ""