让我们从一个例子开始。出于某种原因,我们想要转换一个上游的Observable,值接收偶数的事件,其他的事件丢弃掉。。在第211页的“Flow Control”中,我们将了解使这个任务非常简单的buffer()操作符(buffer(1,2)几乎完全符合我们的要求)。但是,我们将假装我们目前还不知道这个操作符,但是我们可以通过组合几个操作符轻松实现这个功能:

import org.apache.commons.lang3.tuple.Pair;
//...
Observable<Boolean> trueFalse = Observable.just(true, false).repeat();
Observable<T> upstream = //...
Observable<T> downstream = upstream
    .zipWith(trueFalse, Pair::of)
    .filter(Pair::getRight)
    .map(Pair::getLeft);

首先,我们生成一个无限的Observable<Boolean>j交替的发射true和false。我们可以很容易的实现这一点,创建一个固定的流,它只有[true, false],然后通过repeat()操作符来无限重复。repeat()只是从上游截取Completion通知,而不是将其传递到下游。因此,不能保证repeat()会在相同的事件序列中继续循环,但在上游是一个简单的固定流时,情况恰好是这样。参见第254页的“Retrying After Failures”,有一个相似的retry()操作符。

我们使用zipWith()操作符,将我们的上游的Observable和刚刚自定义的交替生成true和false的无限Observable做了zipping操作。然而,zipping需要一个结合两个条目的函数;这在别的语言中很简单,但是在Java中,我们通过使用Apache Commons Lang这个包来帮助我们,该包提供了一个叫Pair的类。此时此刻,我们有个值是Pair<T, Boolean>的流,每个值的右边的数据类型是一个Boolean值。接下来,我们使用filter()过滤掉Pair的第二个元素是alse的事件,值保留是true的事件,这样有效的留下了偶数事件。最后一步是解压这个Pair,取出其左侧的值,扔掉Boolean值,如果你不想引入第三方包的话,另一种实现如下:

import static rx.Observable.empty;
import static rx.Observable.just;
//...
upstream.zipWith(trueFalse, (t, bool) ->
    bool ? just(t) : empty())
.flatMap(obs -> obs)

乍一看,flatMap()看起来有点奇怪,似乎没有做任何真正重要的事情。从zipWith()转换中,我们返回一个 Observable(一个元素或空的),这导致Observable<Observable<T>> 。通过使用flatMap(),我们可以消除这个嵌套级别——毕竟,flatMap()中的lambda表达式对于每个输入元素,应该返回一个Observable ,这这恰巧就是一个Observable。

无论您选择哪种实现,这都不是可重用的。如果您需要重用操every odd element”序列的操作符,您可以复制粘贴它们,或者创建如下这样的实用方法:

static <T> Observable<T> odd(Observable<T> upstream) {
    Observable<Boolean> trueFalse = just(true, false).repeat();
    return upstream
        .zipWith(trueFalse, Pair::of)
        .filter(Pair::getRight)
        .map(Pair::getLeft)
}

但是你不能再链式的组合操作符了,换句话说,你不能和么使用:obs.op1(). odd(). op2()。不像c#(响应式扩展的起源)和Scala 10(通过implicits),Java不允许扩展方法。但是内置的compose()操作符却可以提供近似的功能。compose()以一个函数作为参数,该参数应该通过一系列其他操作符来转换上游Observable。这就是它在实践中的作用:

private <T> Observable.Transformer<T, T> odd() {
    Observable<Boolean> trueFalse = just(true, false).repeat();
    return upstream -> upstream
        .zipWith(trueFalse, Pair::of)
        .filter(Pair::getRight)
        .map(Pair::getLeft);
}
//...
//[A, B, C, D, E...]
Observable<Character> alphabet =
    Observable
        .range(0, 'Z' - 'A' + 1)
        .map(c -> (char) ('A' + c));
//[A, C, E, G, I...]
alphabet
    .compose(odd())
    .forEach(System.out::println);

odd()函数返回一个 Transformer<T, T>,他可以从Observable<T>转换Observable<T>(当然,类型可以是不同的)。因此,Transformer本身就是一个函数,所以我们可以用lambda表达式( upstream -> upstream...)来替换它。注意,当Observable被组装的时候,odd()函数被热执行了,而不是等到订阅时候才开始。有趣的是,如果你想发射偶数(2、4、6等)而不是奇数(1、3、5等),可以用False.skip(1) 来替换trueFalse这个变量

results matching ""

    No results matching ""