一般来说,Observable将是异步的,但这并不是强制的。 Observable也可以是同步的,实际上默认是同步的。RxJava 除非被要求,否则不会增加并发性。一个同步的Observable将被订阅,使用订阅者的线程发出所有数据,并完成(如果是有限的)。通过阻塞网络I / O所支持的Observable ,将同步阻塞订阅线程,然后在阻塞网络I/O返回时通过onNext()发出所有数据。

例如,以下是完全同步的:

Observable.create(s -> {
    s.onNext("Hello World!");
    s.onCompleted();
}).subscribe(hello -> System.out.println(hello));

你将会在35页的“Mastering Observable.create()”中学习更多的 Observable.create的细节,在30页的““Subscribing to Notifications from Observable”学习更多的 Observable.subscribe细节。

现在,你可能会想,这通常不是响应式系统的期望行为,你是对的。以同步阻塞I/O方式来使用Observable 不是一个好的方式(如果确实需要使用阻塞I / O来做某些操作,则需要用线程来做异步调用)。但是,有时同步地从内存缓存中获取数据并立即返回数据是适当的。前一个示例中所示的“Hello World”示例不需要并发性,如果将异步调度添加到它中,实际上会慢得多。因此,通常重要的标准是Observable 事件生产是阻塞还是非阻塞,而不是同步或异步。“Hello World”示例是非阻塞的,因为它从不阻塞线程,因此它是Observable 正确的使用方式(尽管是多余的)。

RxJava的Observable与异步与同步有关,以及并发是否存在,以及它来自何处。这是通过设计,并允许Observable的实现来决定什么是最好的。以及为什么这会是有用?

首先,并发可以在很多地方产生,而不仅仅是线程池。如果数据源已经是异步的,因为它位于事件循环中,那么RxJava不应该增加调度开销,也不应该强制执行特定的调度策略。并发可以来自线程池、事件循环、actors等等。可以添加,也可以来自数据源。RxJava与异步源自何处无关。

其次,使用同步行为有两个很好的理由,我们将在下面的小节中看到。

In-memory data

如果数据存在于本地内存缓存中(检索时间通常是微妙或者纳秒级别的常量值),那花费调度的成本使其异步化是没有意义的。Observable可以同步地获取数据并在订阅线程上发出它,如下所示:

Observable.create(s -> {
    s.onNext(cache.get(SOME_KEY));
    s.onCompleted();
}).subscribe(value -> System.out.println(value));

当数据可能或可能不在内存中时,选择调度的话就是比较好的方案了。如果是在内存中,则同步发出;如果不是,则异步执行网络调用并返回数据。这种选择可以在Observable的范围内有条件地存在。

// pseudo-code
Observable.create(s -> {
    T fromCache = getFromCache(SOME_KEY);
    if(fromCache != null) {
        // emit synchronously
        s.onNext(fromCache);
        s.onCompleted();
    } else {
        // fetch asynchronously
        getDataAsynchronously(SOME_KEY)
            .onResponse(v -> {
                putInCache(SOME_KEY, v);
                    s.onNext(v);
                    s.onCompleted();
                })
                .onFailure(exception -> {
                    s.onError(exception);
                });
    }
}).subscribe(s -> System.out.println(s));

Synchronous computation (such as operators)

保持同步的更常见原因是通过操作符对流进行组合和转换。RxJava经常要使用大量操作符来操作、组合和转换数据,例如map()、filter()、take()、flatMap()和groupBy()。大多数这些操作符是同步的,这意味着当事件经过时,它们在onNext()中同步执行计算。

由于性能原因,这些操作符是同步的。以这个为例:

Observable<Integer> o = Observable.create(s -> {
    s.onNext(1);
    s.onNext(2);
    s.onNext(3);
    s.onCompleted();
});
o.map(i -> "Number " + i)
    .subscribe(s -> System.out.println(s));

如果map操作符默认是异步的,那么(1、2、3)每个数字将被安排在执行字符串连接的(“number”+ i)同一个线程上。由于调度、上下文切换等,这是非常低效的,通常具有不确定性延迟。

这里需要理解的重要一点是,大多数Observable的函数管道都是同步的(除非特定的操作符需要异步,比如timeout或observeOn),而Observable的函数本身可以是异步的。这些主题在第159页的“声明性并发性与observeOn()”和第251页“Timing Out When Events Do Not Occur”中得到了更深入的处理。

下面的例子演示了这种同步和异步的混合:

Observable.create(s -> {
    ... async subscription and data emission ...
    })
    .doOnNext(i -> System.out.println(Thread.currentThread()))
    .filter(i -> i % 2 == 0)
    .map(i -> "Value " + i + " processed on " + Thread.currentThread())
    .subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")

在这个例子中,Observable 是异步的(它与subscriber订阅者的线程不同),所以subscribe是非阻塞的,最后println前将在事件被传播之前,将会打印“SOME VALUE ⇒”出来。

但是,filter()和map()函数在发出事件的调用线程上同步执行。这通常是我们想要的行为:一个带有高效的事件同步计算的异步管道(Observable和组合的操作符)。

因此,Observable类型本身支持同步和异步具体实现,这是通过设计实现的。

results matching ""

    No results matching ""