到目前为止,我们所探索的所有操作符都是针对的每一个事件操作的(例如,filter、map或zipping)。但有时您希望聚合事件以缩小初始流或简化它。例如,考虑一个 Observable<Long>,它监视数据传输的进度。每次发送数据块时,一个long类型的值就会显示出来,显示该数据块的大小。这是一个有用的信息,但是我们真正想知道的是总共传输了多少字节。一个非常糟糕的想法是在操作符中修改全局状态值:

import java.util.concurrent.atomic.LongAdder;
//BROKEN!
Observable<Long> progress = transferFile();
LongAdder total = new LongAdder();
progress.subscribe(total::add);

前面的代码会导致非常讨厌的并发bug,就像其他共享状态一样。可以从任意线程执行操作符中的Lambda表达式,因此全局状态必须是线程安全的。我们还必须考虑到惰性。RxJava试图通过提供可组合的操作符尽可能地最小化全局状态和可变性。即使使用Rx保证,修改全局状态也很棘手。此外,我们不能再依赖于Rx操作符来组合total这个变量--------比如没定期更新用户界面。传输完成时发出的信号也更加复杂。每次都出现新的数据块,我们真正想要的是一种逐步积累数据块大小并报告当前传输总量的方法。我们假设的流应该是类似于下面这样的:

Observable<Long> progress = // [10, 14, 12, 13, 14, 16]
Observable<Long> totalProgress = /* [10, 24, 36, 49, 63, 79]
    10
        10+14=24
            24+12=36
                36+13=49
                    49+14=63
                        63+16=79
    */

第一项是按原样传播的(即10)。然而,在第二个条目(14)被传递到下游之前,它被添加到先前发出的条目(10)中,发射出24(即前两项的和)。第三项(12)再次添加到结果流产生的上一个条目中(24),发射36。这个迭代过程一直持续到上游Observable完成为止。此时,最后一项发出的是所有上游事件的总和。您可以通过使用scan()操作符轻松实现这个相对复杂的工作流:

Observable<Long> totalProgress = progress
    .scan((total, chunk) -> total + chunk);

scan()包含两个参数:一个是最后生成为值(称为累加器),另一个是上游Observable的当前值。在第一次迭代中,total仅仅是progress变量中的第一个条目的值,而在第二个迭代中,total就变成了scan()从上一个条目所得到的结果。如表3 - 1所示。

scan()就像推土机,穿过上游Observable的,并累加所有的条目。重载版本的scan()可以提供一个初始值(如果它不同于简单的第一个元素):

Observable<BigInteger> factorials = Observable
    .range(2, 100)
    .scan(BigInteger.ONE, (big, cur) ->
        big.multiply(BigInteger.valueOf(cur)));

名字叫factorials这个Observable将会生成1 , 2 , 6 , 24 , 120 , 720 …,之类的等等。注意,上游Observable的值从2开始,但下游从1开始,这是我们设置的初始值(BigInteger.ONE)。经验法则是,结果Observable的类型总是与累加器的类型相同。因此,如果您不提供自定义累加器的初始值,则从scan()返回的类型T将不会改变。否则(就像在我们的阶乘例子中),结果是Observable<BigInteger>,因为BigInteger是初始值的类型。显然,这种类型不能在整个扫描过程中发生变化。

有时候,我们不关心中间结果,只关心最后的那个结果。例如,我们要计算传输的总字节数,而不是中间的传输进程。或者,我们希望每当向一个可变数据结构添加一个条目时候,可以累计所有值,比如ArrayList数据结构。reduce()运算符正是为此而设计的。一个非常明显的警告:如果您的序列是无限的,那么scan()将为每个上游事件持续发射事件,而reduce()将不会发出任何事件。假设您有一个带getAmount()方法的CashTransfer 对象的Observable的源,返回BigDecimal。我们想计算所有转帐的总金额。下面两个转换是等价的。他们迭代所有的转账和并把转账的金额进行累加,从0开始累加。

Observable<CashTransfer> transfers = //...;
Observable<BigDecimal> total1 = transfers
    .reduce(BigDecimal.ZERO,
        (totalSoFar, transfer) ->
            totalSoFar.add(transfer.getAmount()));

Observable<BigDecimal> total2 = transfers
    .map(CashTransfer::getAmount)
    .reduce(BigDecimal.ZERO, BigDecimal::add);

两个转换都产生相同的结果,但是第二个转换看起来更简单,尽管使用了两个步骤。这是另一个选择更小、更可组合的转换的原因。您还可以看到reduce()基本就是scan()操作的最后一步。事实上,你可以把reduce操作符实现如下:

public <R> Observable<R> reduce(
            R initialValue,
            Func2<R, T, R> accumulator) {
    return scan(initialValue, accumulator).takeLast(1);
}

正如您所看到的,reduce()扫描了所有的元素,但是除了最后一个值,其他的值都删掉了(参见94页的“Slicing and Dicing Using skip(), takeWhile(), and Others”)。

results matching ""

    No results matching ""