Observable支持异步地推动多个值。这很好地符合下表的右下方,它是Iterable(或者Stream ,
List , Enumerable , etc.)的异步对偶版本和Future的多值版本。
请注意,这一节涉及到Future。它使用Future.onSuccess(callback )语法来表示其行为。不同的实现存在,如CompletableFuture、ListenableFuture或Scala Future。但是不管你做什么,都不要使用java.util.Future,这需要通过阻塞来检索一个值。
那么,为什么一个Observable 会有价值而不是Future呢?最明显的原因是,您正在处理事件流或多值响应。不太明显的原因是多个单值响应的组合。让我们看看每一个:
Event stream
事件流是非常简单的。随着时间的推移,生产者将事件推给消费者,正如这里所展示的:
// producer
Observable<Event> mouseEvents = ...;
// consumer
mouseEvents.subscribe(e -> doSomethingWithEvent(e));
但是对Future不是很友好:
// producer
Future<Event> mouseEvents = ...;
// consumer
mouseEvents.onSuccess(e -> doSomethingWithEvent(e));
onSuccess回调本可能已收到“last event,”,但仍有一些问题存在:消费者现在是否需要进行轮询?生产者会给他们排队吗?还是它们会在每次获取之间丢失吗?这种Observable在这里绝对是有益的。在缺乏Observable的情况下,回调方法将比以Future建模更好。
PS:作者所使用的并不是原生的Future,而是Google的ListenableFuture,以及Java的CompletableFuture,例子中的onSuccess(callback)是Java原生的CompletableFuture,但记住,它并不是直接使用Future,只是使用了多态罢了.所以作者的意图是表达,优先考虑Observable
Multiple values
多值响应是Observable的下一个应用。基本上,可以使用List、Iterable或Stream的任何地方,就可以使用Observable:
// producer
Observable<Friend> friends = ...
// consumer
friends.subscribe(friend -> sayHello(friend));
现在,使用Future的话:
// producer
Future<List<Friend>> friends = ...
// consumer
friends.onSuccess(listOfFriends -> {
listOfFriends.forEach(friend -> sayHello(friend));
});
所以,为什么我们要使用 Observable<Friend>这种方法呢??
如果返回的数据列表很小,那么它可能对性能没有影响,使用Future可以说是不错的选择。但是,如果列表很大,或者远程数据源必须从不同的位置获取列表的不同部分,那么使用Observable<Friend>方法可以在性能和延迟上获得收益。
最令人信服的原因是,条目可以按照接收的方式进行处理,而不是等待整个集合到达。当后端不同的网络延迟会以不同的方式影响每个条目时,这一点尤其正确,因为长尾延迟(比如面向服务的或微服务架构)和共享数据存储,实际上相当常见。如果等待整个集合,消费者将始终体验到为集合所做的聚合工作的最大延迟。如果将条目作为Observable流返回,,那么消费者将立即收到它们,“第一个条目的时间”的时间可能大大低于最后一个和最慢的条目。但是为了完成这项工作,必须牺牲流的顺序,以便可以按照服务器获取的任何顺序发出条目。如果顺序最终对消费者很重要,那么可以将排名或位置包含在项目数据或元数据中,然后客户可以根据需要对这些项进行排序或定位。
此外,它将内存使用限制为每个条目所需,而不需要为整个集合分配和收集内存。
Composition
在组合单值响应(如Future)时,多值Observable类型也很有用:
当将多个Futures合并在一起时,它们会以一个单值形式发射另一个Future,就像这样:
CompletableFuture<String> f1 = getDataAsFuture(1);
CompletableFuture<String> f2 = getDataAsFuture(2);
CompletableFuture<String> f3 = f1.thenCombine(f2, (x, y) -> {
return x+y;
});
这可能正是我们想要的,并且在RxJava中可以通过Observable.zip得到:
Observable<String> o1 = getDataAsObservable(1);
Observable<String> o2 = getDataAsObservable(2);
Observable<String> o3 = Observable.zip(o1, o2, (x, y) -> {
return x+y;
});
然而,这意味着等待直到所有的Futures 都完成,然后才发出某种东西。通常,我们希望可以在每个操作完成时,即发射这个future的值。在这种情况下,merge(或相关的flatMap)更可取。它允许将结果(即使每个Observable只发射一个值)组合到一个值流中,当它们准备好时,这些值就会被释放:
Observable<String> o1 = getDataAsObservable(1);
Observable<String> o2 = getDataAsObservable(2);
// o3 is now a stream of o1 and o2 that emits each item without waiting
Observable<String> o3 = Observable.merge(o1, o2);
Single
现在,尽管Rx Observable可以很好地处理多值流,但是单值表示的简单性对于API设计和消费来说非常好。 此外,基本的请求/响应行为在应用程序中非常普遍。出于这个原因,RxJava提供了一个Single类型,这是惰性的Future。你把它看作是一个有两个好处的Future:首先,它是懒惰的,因此它可以订阅多次,并且很容易组合,其次,它适合RxJava API,因此它可以很容易地与一个Observable交互:
例如,考虑这些访问器:
public static Single<String> getDataA() {
return Single.<String> create(o -> {
o.onSuccess("DataA");
}).subscribeOn(Schedulers.io());
}
public static Single<String> getDataB() {
return Single.just("DataB")
.subscribeOn(Schedulers.io());
}
这些可以被使用,也可以选择像这样组合:
// merge a & b into an Observable stream of 2 values
Observable<String> a_merge_b = getDataA().mergeWith(getDataB());
注意两个Singles是如何合并成一个Observable。不过还需要知道,这可能导致[A,B]或[B,A]的事件发射顺序,这取决于首先完成的是那个工作。
回到前面的例子,我们现在可以使用Single而不是Observable来表示数据获取,而是将它们合并到一个值流中:
// Observable<String> o1 = getDataAsObservable(1);
// Observable<String> o2 = getDataAsObservable(2);
Single<String> s1 = getDataAsSingle(1);
Single<String> s2 = getDataAsSingle(2);
// o3 is now a stream of s1 and s2 that emits each item without waiting
Observable<String> o3 = Single.merge(s1, s2);
使用Single而不是Observable表示“stream of one”简化了消费,因为开发人员只需要考虑Single类型的以下行为:
它可以响应一个错误
没有响应
成功响应
将此与使用者在使用Observable时候必须考虑的附加状态进行比较的话:
它可以响应一个错误
没有响应
成功地响应没有数据并终止
用一个值成功响应并终止
成功地响应多个值并终止
用一个或多个值成功响应,且永不终止(等待更多数据)
通过使用Single,可以使用我们更容易使用API,并且只有在组合成Observable情况之后,开发人员才需要考虑额外的状态。这通常是一个更好的地方,因为通常开发人员控制代码,而数据API通常来自第三方。
在202页将会重点讲解Single。
Completable
除了Single之外,RxJava还有一个Completable类型,它解决了没有返回类型的常见用例,只需要表示成功或失败的完成。经常Observable<Void>或Single<Void>最终被使用。这很尴尬,就像这里所展示的那样,是非常复杂的。
Completable c = writeToDatabase("data");
这种用例在异步写操作时很常见,不需要返回值,但是需要通知成功或失败的完成。前面的使用Completable的代码与下面类似:
Observable<Void> c = writeToDatabase("data");
Completable是两个回调的抽象,分别是完成和失败
如下:
static Completable writeToDatabase(Object data) {
return Completable.create(s -> {
doAsyncWrite(data,
// callback for successful completion
() -> s.onCompleted(),
// callback for failure with Throwable
error -> s.onError(error));
});
}
Zero to infinity
Observable可以支持从零到无限的基数(在38页的“无限流”中更多地探讨)。但对于简单和清晰的,Single是“Observable of One”,而Completable是一个“Observable of None”。
通过这些新引入的类型,我们的表最终看起来是这样的: