在第35页的“精通Observable.create()”中,我们看到默认情况下,subscribe()会使用客户端线程。回顾一下,这里是最简单的订阅,你可以找出不涉及任何线程的地方:

Observable<String> simple() {
    return Observable.create(subscriber -> {
        log("Subscribed");
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    });
}
//...
log("Starting");
final Observable<String> obs = simple();
log("Created");
final Observable<String> obs2 = obs
    .map(x -> x)
    .filter(x -> true);
log("Transformed");
obs2.subscribe(
    x -> log("Got " + x),
    Throwable::printStackTrace,
    () -> log("Completed")
);
log("Exiting");

注意日志语句的位置,并仔细研究输出,特别是关于哪个线程调用了print语句:

33 | main | Starting
120 | main | Created
128 | main | Transformed
133 | main | Subscribed
133 | main | Got A
133 | main | Got B
133 | main | Completed
134 | main | Exiting

注意:语句的顺序是完全可以预测的。首先,前面的代码片段中的每一行代码都在主线程中运行,没有线程池,也没有涉及事件的异步发射。其次,执行的顺序乍一看可能不完全清楚。

当程序启动时,它会打印"Starting ",这是可以理解的。创建了Observable<String> 的实例之后,我们看到了"Created"的消息。请注意,当我们实际订阅时,"Subscribed"出现在后面。如果不调用subscribe()调用,那么可以在observable . create()中的代码块永远不会被执行。而且,即使是map()和filter()操作符也没有任何可见的副作用,请注意“Transformed”日志消息是什么时候打印的,他甚至出现在了“Subscribed”之前。

稍后,我们将接收所有发出的事件和完成通知。最后,"Exiting"语句被打印出来,程序可以返回了。这是一个有趣的观察-----当事件以异步形式出现时,subscribe()本应该注册一个回调函数。这是默认情况下应该做的假设。但是,在这种情况下,不涉及线程,并且subscribe()实际上是阻塞。这样如何?

subscribe()和create()之间有一个固有的但隐藏的联系。每当您在Observable上调用subscribe()时,都会调用它的OnSubscribe回调方法(包装您传递给create()的lambda表达式)。它接收您的Subscriber(订阅者)作为参数。默认情况下,这种情况在相同的线程中发生,并且阻塞,所以无论您在create()中做什么,都将阻塞 subscribe()。如果您的create()方法休眠几秒钟,subscribe()也将阻塞。此外,如果在Observable.create()和您的Subscriber(lambda扮演回调)之间存在操作符,那么所有这些操作符都是由代表调用subscribe()的线程调用的。在Observable 和Subscriber 之间,默认情况下,RxJava不注入任何并发设施。这背后的原因是,Observable 通常由其他并发机制(如事件循环或自定义线程)支持,因此Rx允许您完全控制,但不会强制执行任何约定。

这个观察为subscribeOn()操作符准备了风景。通过在原始Observable到subscribe()之间插入subscribeOn(),您以声明式的方式在OnSubscribe回调函数被调用的地方选择Scheduler。无论你在create()中做什么,这个工作将被卸载到一个独立的Schedulker中,并且您的subscribe()调用不再阻塞:

log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
    .subscribeOn(schedulerA)
    .subscribe(
        x -> log("Got " + x),
        Throwable::printStackTrace,
        () -> log("Completed")
    );
log("Exiting");
35 | main | Starting
112 | main | Created
123 | main | Exiting
123 | Sched-A-0 | Subscribed
124 | Sched-A-0 | Got A
124 | Sched-A-0 | Got B
124 | Sched-A-0 | Completed

您是否看到main线程在Observable发射值之前就已经打印了“Exiting”退出了?从技术上讲,日志消息的顺序不再是可预测的,因为两个线程同时运行:main线程,它订阅了并且想退出,还有sched- a - 0线程,它会在某人订阅时发出事件。

import static java.util.concurrent.Executors.newFixedThreadPool;
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);
private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
        .setNameFormat(pattern)
        .build();
}

这些调度器将在所有示例中使用,但它们非常容易记住。三个独立的调度器,每个调度程序管理10个线程。为了使输出更好,每个线程池都有一个不同的命名模式。

在开始之前,您必须了解,在成熟的应用程序中,在Rx采用方面,subscribeOn()很少使用。通常,Observable来自于自然是异步的数据源(如RxNetty,参见第169页的“非阻塞HTTP服务器与Netty和RxNetty”)或自己应用Scheduler(比如Hystrix,参见第291页的“Hystrix管理失败”)。您应该只在特定的情况下使用subscribeOn(),比如说底层的Observable是同步的( create()会导致阻塞)。然而,subscribeOn()仍然是一个比在create()方法中手工创建线程的更好的解决方案

//Don't do this
Observable<String> obs = Observable.create(subscriber -> {
    log("Subscribed");
    Runnable code = () -> {
        subscriber.onNext("A");
        subscriber.onNext("B");
        subscriber.onCompleted();
    };
    new Thread(code, "Async").start();
});

前面的代码混合了两个概念:生成事件和选择并发策略。Observable 应该只负责生产逻辑,而只有客户端代码才能对并发性做出明智的决定。请记住,Observable 是惰性的,但也是不可变的,因为subscribeOn()只影响下游的订阅者,如果有人订阅完全相同的Observable ,而没有subscribeOn(),默认情况下不会涉及并发。

请记住,我们本章所关注的是已存在的应用程序并逐渐的引入RxJava。 subscribeOn()操作符在这种情况下非常的有用。但是,在您掌握了响应式扩展并开始大规模使用它们之后,subscribeOn()的使用量就会减少。在完全响应式的软件栈中,如在Netflix上发现的,subscribeOn()几乎从未使用过,但所有Observables都是异步的。大多数Observables 都来自异步源,默认情况下它们被视为异步源。因此,使用subscribeOn()的情况非常有限,主要是在对现有Api或库进行改进时。在第5章中,我们在没有显式subscribeOn()和Scheduler的情况下编写真正的异步应用程序。

results matching ""

    No results matching ""