单独的Observable流既不允许并发,也不允许并行。相反,它们是通过异步Observable组合来实现的。

并行性是同时执行任务,通常在不同的cpu或机器上执行。另一方面,并发性是多个任务的组合或交叉。如果单个CPU有多个任务(如线程),它们将并发执行,但不会并行执行“时间切片”。每个线程在向另一个线程屈服(让步)之前都会获得一部分CPU时间,即使线程尚未完成。

并行执行按照定义是并发性的,但并发性并不一定是并行性的。在实践中,这意味着多线程是并发性的,但是并行性只发生在那些线程被调度并同时在不同的CPU上执行的情况下。因此,我们通常讨论并发性和并发,但并行性是并发性的一种特殊形式。

RxJava的Observable的契约是事件(onNext()、onCompleted()、onError())不能并发的发出。换句话说,一个单独的Observable流必须始终是串行化和线程安全的。每一个事件都可以从不同的线程中释放,只要不同时排放。这意味着没有交叉或同时执行onNext()。如果onNext()仍然在一个线程上执行,另一个线程不能再次调用它(否则会发生交错)。

这里有一个例子:

Observable.create(s -> {
    new Thread(() -> {
        s.onNext("one");
        s.onNext("two");
        s.onNext("three");
        s.onNext("four");
        s.onCompleted();
    }).start();
});

这段代码按顺序发出数据,所以它符合契约。(不过,请注意,一般建议不要在Observable内启动线程。相反,应该使用scheduler).

下面是一个非法的代码示例:

// DO NOT DO THIS
Observable.create(s -> {
    // Thread A
    new Thread(() -> {
        s.onNext("one");
        s.onNext("two");
    }).start();
    // Thread B
    new Thread(() -> {
        s.onNext("three");
        s.onNext("four");
    }).start();
    // ignoring need to emit s.onCompleted() due to race of threads
});
// DO NOT DO THIS

此代码是非法的,因为它有两个线程可以同时调用onNext()。这违反了契约。(同时,它需要安全地等待两个线程完成调用onComplete,如前所述,手动启动这样的线程通常是个坏主意。)

那么,如何利用并发性和/或与RxJava的并行性呢? 通过组合:

单独的Observable流始终是串行的,但是每个Observable 流都可以彼此独立地操作,因此可以并发地和/或并行地运行。这就是为什么merge和flatMap最终会在RxJava中被广泛使用的原因-------并发地组合异步流。(你可以在第67页的“Wrapping Up Using flatMap()”中了解flatMap的详细信息,并在第77页中“Treating Several Observables as One Using merge()”学习merge。)

下面是一个设计的示例,它展示了两个在单独的线程上运行异步Observable并合并在一起的的机制:

Observable<String> a = Observable.create(s -> {
    new Thread(() -> {
        s.onNext("one");
        s.onNext("two");
        s.onCompleted();
    }).start();
});
Observable<String> b = Observable.create(s -> {
    new Thread(() -> {
        s.onNext("three");
        s.onNext("four");
        s.onCompleted();
    }).start();
});
// this subscribes to a and b concurrently,
// and merges into a third sequential stream
Observable<String> c = Observable.merge(a, b);

Observable c 接收来自a和b的条目,并由于它们的异步性,发生的三件事:

  • “one” 将在 “two”之前出现
  • “three” 将在 “four”之前出现
  • 1 / 2和3 / 4之间的顺序未说明

那么为什么不允许同时调用onNext()呢?

主要因为onNext()是为了让我们人类使用,并且并发是困难的。如果onNext()可以同时调用,则意味着每个Observer(观察者)都需要对并发调用进行防御性的编码,即使是在没有预料到的情况下。

第二个原因是由于一些操作不可能并发的发射;例如,scan和reduce,这都是常见的和重要的行为。像scan和reduce这样的操作符需要串行的事件传播,这样状态可以在不支持结合律和交换律的事件的流中累积。允许并发Observable流(即允许onNext()的话)将限制可以处理的事件类型,并需要线程安全的数据结构。

PS:Java 8 Stream类型支持并发发射。这就是为什么java.util.stream.Stream需要reduce函数来变得可结合(满足结合律),因为它们必须支持并行流上的并发调用。java.util.stream包的文档的关于并行性、排序(与交换律有关)、reduce操作和结合律进一步说明了支持串行和并发发射的相同Stream类型的复杂性。

第三个原因是性能受到同步开销的影响,因为所有观察者和操作符都需要线程安全,即使大多数时间数据是串行到达的。尽管JVM经常擅长消除同步开销,但它并不总是可行的(特别是使用原子性的非阻塞算法),所以这最终是串行流的性能税。

此外,执行通用的细粒度并行性通常比较慢。并行性通常需要粗粒度地进行,比如在批处理工作中,以弥补切换线程、调度工作和重新组合的开销。在单个线程上同步执行,并利用许多内存和CPU优化来进行串行计算,效率要高得多。在列表或数组中,对于批处理并行性有合理的默认值是很容易的,因为所有的条目都是预先知道的,并且可以被分成批处理(尽管如此,在单个CPU上处理完整列表的速度通常会更快,除非列表非常大,或者每项的计算都很重要)。但是,流并不提前知道工作,它只是通过onNext()来接收数据,因此不能自动地将工作进行处理。

事实上,在RxJava v1之前。.parallel(Function f)操作符被添加进来,试图像java.util.stream.Stream.parallel(),因为这被认为是一个不错的便利。它是通过将单个Observable分割成许多Observable,每一个拆分出来的新的Observable都并行执行的,然后再将它们合并在一起,从而不破坏RxJava契约。但是,在v1之前,它被从库中删除,因为它非常混乱,而且几乎总是导致性能更差。将计算的并行性添加到事件流中,几乎总是需要进行推理和测试。也许ParallelObservable 是有意义的,因为操作符被限制为一个子集,该子集假设有结合律,但是在使用RxJava的时间里,它从来没有最终成为值得付出的努力,因为merge和flatMap的组合是处理用例的有效的构建块。

第3章将介绍如何使用操作符组合Observable,以从并发性和并行性中获益。

results matching ""

    No results matching ""