现在,让我们将T类型事件的有限流转换为一个只有一个List<T>类型的事件的流。当然,当上游Observable<T>完成时,该事件才会被发射:
Observable<List<Integer>> all = Observable
.range(10, 20)
.reduce(new ArrayList<>(), (list, item) -> {
list.add(item);
return list;
});
这个reduce()的例子简单地以一个空的ArrayList<Integer>(它是一个累加器)开头,并将每个发射的条目添加到这个ArrayList中。负责累加操作的这个lambda表达式必须返回一个新版本的累加器。不幸的是,List.add()的返回值并不是我们期望的List;相反,它返回布尔值,标识操作成功与否。因此我们需要显式的return语句。为了克服这种冗长,可以使用collect()操作符。它几乎完全像reduce(),但是它假定对于每个事件,我们都使用相同的可变累加器,而不是每次返回一个新的不可变的累加器(将其与不可变的BigInteger示例进行比较):
Observable<List<Integer>> all = Observable
.range(10, 20)
.collect(ArrayList::new, List::add);
collect()的另一个有用的场景是将所有事件聚合到一个StringBuilder中。在这种情况下,累加器是一个空的StringBuilder,一个操作将一个项目追加到该builder中:
Observable<String> str = Observable
.range(1, 10)
.collect(
StringBuilder::new,
(sb, x) -> sb.append(x).append(", "))
.map(StringBuilder::toString);
就像每个Observable的操作符一样,reduce()和collect()都是非阻塞的,因此生成的List<Integer>包含了所有Observable.range(10, 20)发射的数据。当上游Observable完成时,该列表才会被发射。异常也会正常传播。 将Observable<T>转换为Observable<List<T>>这种需求是如此常见,因此存在一个内置的toList()操作符。请参阅第118页上的“BlockingObservable: Exiting the Reactive World”,中的现实生活的用例。