信不信由你,RxJava中的并发可以由两个操作符来描述:上述的subscribeOn()和observeOn()。它们看起来非常相似,对新手来说很容易混淆,但是它们的语义实际上是非常清楚和合理的。
subscribeOn()允许选择使用哪个Scheduler来调用OnSubscribe(即在create()内的lambda表达式)。因此,create()中的任何代码都将被推到另一个线程——例如,为避免阻塞主线程。相反,observeOn()控制哪一个Scheduler被用于去调用出现在observeOn()之后的下游订阅者。例如,在io() Scheduler(通过subscribeOn(io()))上调用create(),以避免阻塞用户界面。但是,更新用户界面的小部件必须发生在UI线程中(Swing和Android都有这个约束),所以我们在操作符或者订阅者改变UI之前,我们对Android Schedulers.mainThread() 使用了observeOn()。通过这种方式,我们可以使用一个调度器来处理create()和直到第一个observeOn()出现之前的所有的操作符,但是其他的应用转换。举个例子解释会更清楚:
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(x -> log("Found 1: " + x))
.observeOn(schedulerA)
.doOnNext(x -> log("Found 2: " + x))
.subscribe(
x -> log("Got 1: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
log("Exiting");
observeOn()发生在管道链中,而这一次,与 subscribeOn()相反,observeOn()的位置非常重要。无论observeOn()上面的操作符运行在什么Scheduler上(如果有的话),observeOn()操作符下面的一切都使用该操作符提供的调度器来执行。在本例中,没有subscribeOn(),因此应用了默认值(即没有并发性):
23 | main | Starting
136 | main | Created
163 | main | Subscribed
163 | main | Found 1: A
163 | main | Found 1: B
163 | main | Exiting
163 | Sched-A-0 | Found 2: A
164 | Sched-A-0 | Got 1: A
164 | Sched-A-0 | Found 2: B
164 | Sched-A-0 | Got 1: B
164 | Sched-A-0 | Completed
上面的所有操作符都是在客户机线程内执行的,这也是RxJava中的缺省操作。但是在observeOn()下面的操作符都在observeOn()所提供的Scheduler中执行。当subscribeOn()和多个observeOn()在管道中发生时,这将变得更加明显:
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(x -> log("Found 1: " + x))
.observeOn(schedulerB)
.doOnNext(x -> log("Found 2: " + x))
.observeOn(schedulerC)
.doOnNext(x -> log("Found 3: " + x))
.subscribeOn(schedulerA)
.subscribe(
x -> log("Got 1: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
log("Exiting");
你能预测着输出吗?记住,在observeOn()操作符下面的所有内容都是在提供的Scheduler中运行的,当然直到遇到另一个observeOn()。此外,subscribeOn()可以发生在Observable到subscribe()之间的任何地方,但这一次它只影响向下到达遇到第一个observeOn()的操作符为止。
21 | main | Starting
98 | main | Created
108 | main | Exiting
129 | Sched-A-0 | Subscribed
129 | Sched-A-0 | Found 1: A
129 | Sched-A-0 | Found 1: B
130 | Sched-B-0 | Found 2: A
130 | Sched-B-0 | Found 2: B
130 | Sched-C-0 | Found 3: A
130 | Sched-C-0 | Got: A
130 | Sched-C-0 | Found 3: B
130 | Sched-C-0 | Got: B
130 | Sched-C-0 | Completed
订阅发生在schedulerA,因为这是我们在subscribeOn()中指定的。而且“Found 1”操作符也在该Scheduler内执行,因为它在第一个observeOn()之前。后来,情况变得更有趣了。observeOn()将当前Scheduler 转换为SchedulerB,而“Found 2”则使用这个。最后一个observeOn(schedulerC)对“Found 3”操作符和订阅者都有影响。请记住,请记住,订阅者在最后遇到的Scheduler的上下文中工作。
当您想要物理的解耦生产者(即Observable.create())和消费者(即订阅者)时,subscribeOn()和observeOn()一起通常工作得非常好。默认情况下,没有这种解耦,而RxJava仅使用相同的线程。仅仅使用subscribeOn()是不够的,我们只是选择一个不同的线程。observeOn()比较好,但当我们碰上同步的Observables时候,我们会阻塞客户机线程。因为大多数操作符都是非阻塞的,并且在它们内部使用的lambda表达式往往很短而且很廉价,通常在操作符的管道中只有一个subscribeOn()和observeOn()。 subscribeOn()可以放置在原始的Observable上,这样可以提高可读性,而observeOn()则可以昂在离subscribe()近的地方,这样只有订阅者才会使用该特殊Scheduler,其他操作符则依赖于subscribeOn()的Scheduler。
这里有一个更高级的程序,它利用了这两个操作符:
log("Starting");
Observable<String> obs = Observable.create(subscriber -> {
log("Subscribed");
subscriber.onNext("A");
subscriber.onNext("B");
subscriber.onNext("C");
subscriber.onNext("D");
subscriber.onCompleted();
});
log("Created");
obs
.subscribeOn(schedulerA)
.flatMap(record -> store(record).subscribeOn(schedulerB))
.observeOn(schedulerC)
.subscribe(
x -> log("Got: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
log("Exiting");
store()只是一个简单的嵌套操作:
Observable<UUID> store(String s) {
return Observable.create(subscriber -> {
log("Storing " + s);
//hard work
subscriber.onNext(UUID.randomUUID());
subscriber.onCompleted();
});
}
事件的产生发生在schedulerA,但是每个事件都是独立地使用schedulerB 来改进并发性,这是我们在第154页的“subscribeOn() Concurrency and Behavior”中了解到的一种技术。最后的订阅在另一个调度器schedulerC 中发生。我们非常确定您现在已经了解了哪个Scheduler/thread将执行哪个操作,但只是为了以防万一(为了清晰起见,添加了空行):
26 | main | Starting
93 | main | Created
121 | main | Exiting
122 | Sched-A-0 | Subscribed
124 | Sched-B-0 | Storing A
124 | Sched-B-1 | Storing B
124 | Sched-B-2 | Storing C
124 | Sched-B-3 | Storing D
1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce
1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8
1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a
1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587
1137 | Sched-C-1 | Completed
对于具有UI的应用程序来说,observeOn()尤其重要,因为我们不想阻塞UI的事件分派线程。在Android上(参见第277页的“RxJava的Android开发”)或者Swing,一些类似于更新UI的操作必须在一个特定的线程中执行。但是在这个线程中做太多会导致UI没有响应。在这些情况下,您将observeOn()放到subscribe()附近,以便在特定Scheduler (如UI-thread)的上下文中调用订阅中的代码。然而,其他的转换,即使相当廉价,也应该在UI线程之外执行。在服务器上,observeOn()很少使用,因为并发的真正数据源通常被构建进多个Observables。这将导致一个有趣的结论:RxJava仅使用两个操作符(subscribeOn()和observeOn())控制并发,但是你使用的响应式拓展越多,您在生产代码中看到这些俩操作符的频率越低。