concat()(和实例方法concatWith())允许将两个Observable连接在一起:当第一个完成时,concat()订阅第二个Observable。重要的是,只有在第一个Observable正常完成的情况下,concat()才会订阅第二个Observable(参见第75页“Preserving Order Using concatMap()”)。concat()甚至可以接收应用了不同操作符的相同的上游Observable的参数。例如,如果我们想要从很长的流中只收到前几个和最后几个条目,我们可以像下面这样使用:
Observable<Data> veryLong = //...
final Observable<Data> ends = Observable.concat(
veryLong.take(5),
veryLong.takeLast(5)
);
译者注:因为翻译的比较拗口,其实从例子可以看出来,contact操作符的两个入参,都是veryLong,但是他们被应用了不一样的操作符
请记住,前面的代码示例订阅了veryLong这个Observable两次,这是不可取的。concat()的另一个例子是当第一个流没有发出任何东西时,可以使用另外的流来为它提供回退值:
Observable<Car> fromCache = loadFromCache();
Observable<Car> fromDb = loadFromDb();
Observable<Car> found = Observable
.concat(fromCache, fromDb)
.first();
Observables是惰性的,所以loadFromCache()和loadFromDb()都不会加载任何数据。loadFromCache()可以在缓存为空时不发射任何事件,但是loadFromDb()总是发出一个Car。concat()首先会订阅fromCache,如果它发出一个条目的话,concat()将不订阅fromDb。但是,如果fromCache是空的,concat()将会继续订阅fromDb,并从数据库加载数据。
译者注:这其实完成的是我们经常操作Redis或者其他Ignite等缓存时候做的:先查询下缓存,缓存没有再查数据库
concat()操作符实际上与merge()和switchMap()密切相关。并且.concat() 工作的方式就像 List<T>的串联/并置一样:首先,它从第一个流中获取所有条目,并且只有当它完成时,它才开始消费第二流。当然,与我们迄今遇到的所有操作符一样,concat()也是非阻塞的,它只在底层流发射事件时候才会发出事件。现在,让我们比较concat()与merge()(参见在第77页的“Treating Several Observables as One Using merge()”)以及刚刚介绍的switchOnNext()。
考虑一组People,每个人都有麦克风。每个麦克风都被建模为一个Observable<String>,一个事件代表一个单词。显然,只要他们一说话,事件机会出现。为了模拟这种行为,出于演示目的,我们将构造一个简单的Observable:
Observable<String> speak(String quote, long millisPerChar) {
String[] tokens = quote.replaceAll("[:,]", "").split(" ");
Observable<String> words = Observable.from(tokens);
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
return words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
}
前面的代码片段非常复杂,所以让我们先来研究一下。我们接收任意的字符串形式的文本,并将其split为单词的数组,并使用正则表达式来删除标点符号。现在,对于每个单词,我们计算出说这个单词需要多少时间,计算方法就是:将单词长度乘以millisPerChar。然后,我们希望可以随着时间的推移传播单词,所以我们希望在前面的例子中计算的延迟之后,每个单词出现在结果流中。显然,简单的from操作符是不够的:
Observable<String> words = Observable.from(tokens);
根据前一个单词的长度,我们希望单词以延迟出现。第一个天真的方法只是根据单词的长度,来延迟每个单词:
words.flatMap(word -> Observable
.just(word)
.delay(word.length() * millisPerChar, MILLISECONDS));
这个解决方案是不正确的。Observable 将首先发射出所有的一个字母的单词。然后,过了一会儿,所有两个字母的单词,再后面跟着的是三个字母。我们想要的是,第一个单词出现,然后第二个单词在某个延时之后,延迟时间取决于第一个单词的长度。这听起来非常复杂,但结果却相当令人愉快。首先,我们从words中创建一个辅助流,它只包含每个单词所引起的相对延迟:
words
.map(String::length)
.map(len -> len * millisPerChar);
假设millisPerChar是100,而说的话是“Though this be madness”,我们首先得到以下流:600,400,200,700。如果我们只是简单的对每一个单词delay()这一定的延迟的话,“be”单词会出现在第一个,其他单词也会被打乱。我们真正想要的是绝对延迟的累积序列,像这样:600 600 + 400 = 1000;1000 + 200 = 1200;1200 + 700 = 1900。这很容易使用scan()操作符(参见第88页“Scanning Through the Sequence with Scan and Reduce”)。
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
现在,当我们有一系列的单词和一系列的绝对延迟序列时,我们可以将这两个流压缩。这是zip()的一种使用情况:
words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft()))
这样做是有意义的,因为我们知道这两条流的大小长度完全相同,而且完全同步。嗯…几乎是吧。我们不希望第一个单词被推迟,他要立即的出现。相反,第一个单词的长度应该影响第二个单词的延迟,第一个单词和第二个单词的总长度应该会影响第三个单词的延迟,等等。您可以轻松地通过添加一个0的绝对延迟来实现这种转变。
import org.apache.commons.lang3.tuple.Pair;
words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
我们构建了一个由单词和单词的绝对延迟组成的元组对,通过添加了startWith(),我们确保第一个单词不会被延迟。这些配对可能如下:
(Though, 0)
(this, 600)
(be, 1000)
(madness, 1200)
...
这是我们的讲话的时间线,每一个词都伴随着它的讲话时间点。我们所要做的就是把这每一个元组对都变成一个延迟一定时间的元素的Observable.
flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
经过这么多的准备,我们终于可以看到concat()、merge()和Next()的区别了。假设有三个人引用了莎士比亚的《哈姆雷特》:
Observable<String> alice = speak(
"To be, or not to be: that is the question", 110);
Observable<String> bob = speak(
"Though this be madness, yet there is method in't", 90);
Observable<String> jane = speak(
"There are more things in Heaven and Earth, " +
"Horatio, than are dreamt of in your philosophy", 100);
正如你所看到的,每个人的millisPerChar 都不是相同的(语速嘛)。如果所有的人同时发言会发生什么?RxJava可以回答这个问题:
Observable
.merge(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w)
)
.subscribe(System.out::println);
输出是非常混乱的,每个人说的话互相交错。我们所听到的只是噪音,如果没有打印前面预先加上的说话人的名字,我们很难理解这句话是谁讲的:
Alice: To
Bob: Though
Jane: There
Alice: be
Alice: or
Jane: are
Alice: not
Bob: this
Jane: more
Alice: to
Jane: things
Alice: be
Bob: be
Alice: that
Bob: madness
Jane: in
Alice: is
Jane: Heaven
Alice: the
Bob: yet
Alice: question
Jane: and
Bob: there
Jane: Earth
Bob: is
Jane: Horatio
Bob: method
Jane: than
Bob: in't
Jane: are
Jane: dreamt
Jane: of
Jane: in
Jane: your
Jane: philosophy
这就是merge()的工作方式:它订阅每个人说的话的单词,并将其转发到下游,不管昂前是谁在说话。如果两个流同时发出一个事件,那么它们都立即被转发。在这个操作符中没有缓存或停止事件。
如果将merge()替换为concat()操作符,情况就大不相同了:
Alice: To
Alice: be
Alice: or
Alice: not
Alice: to
Alice: be
Alice: that
Alice: is
Alice: the
Alice: question
Bob: Though
Bob: this
Bob: be
Bob: madness
Bob: yet
Bob: there
Bob: is
Bob: method
Bob: in't
Jane: There
Jane: are
Jane: more
Jane: things
Jane: in
Jane: Heaven
Jane: and
Jane: Earth
Jane: Horatio
Jane: than
Jane: are
Jane: dreamt
Jane: of
Jane: in
Jane: your
Jane: philosophy
现在的顺序是完美的。concat(alice, bob, jane)首先订阅了alice,并将第一个Observable的事件继续转发,直到它耗尽并完成。然后,concat()切换到bob。现在再脑海里回想一下Hot和Cold类型的Observable。在merge()的用例中,来自所有流的事件都被转发到下游,因为merge()订阅了每个流。然而,concat()只订阅第一个流,因此,如果在Hot类型的Observable情况下,您可能期望得到不同的结果。当第一个Observable完成时,第二个可能会发送一个与之前的Observable完全不同的事件序列。请记住,conca()并不会缓存第二个Observable,直到第一个Observable完成为止;相反的,它只是惰性的订阅了。
switchOnNext()是一种完全不同组合操作符的方式。假设您有一个Observable<Observable<T>>,它是一个事件流,而每个事件都是一个事件流(嵌套结构嘛)。这种情况实际上是有意义的,例如,如果你有一组移动电话,它们连接着网络或者是与网络断开,在这里网络就是外部流。每个新的连接都是一个事件,但是每一个事件都是独立的心跳消息流(Observable<Ping>)。在我们的例子中,我们会有一Observable<Observable<String>> ,每个内部流都是来自alice , bob , 或者 jane的讲话。
import java.util.Random;
Random rnd = new Random();
Observable<Observable<String>> quotes = just(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w));
首先,我们将alice、bob和jane这三个Observables封装到一Observable<Observable<String>>中。让我们重申:quotes这个Observable会立即发出三个事件,每个事件都是一个Observable<String>的内部流,而每一个Observable<String>内部流代表每个人所说的话。为了说明switchOnNext()的工作原理,我们将延迟内部的Observable的发射。我们并不是将Observable中的每个单词都延迟(下面的变式A),而是延迟整个Observable (下面的变式B)
//A
map(innerObs ->
innerObs.delay(rnd.nextInt(5), SECONDS))
//B
flatMap(innerObs -> just(innerObs)
.delay(rnd.nextInt(5), SECONDS))
在变式A中,在外面的流中,Observable 出现的很快,但是,它会延迟发射事件。另一方面,在变式B中,我们将整个Observable适当的延迟,它相对会晚一些才出现在外侧的Observable中。那么我么你为什么需要这么复杂的设置呢??静态的concat()和merge()操作符都可以被应用于一个Observable的固定的列表,而在 switchOnNext()情况下,针对梯状的有意义。
switchOnNext()一开始首先通过订阅外部 Observable<Observable<T>> ,它发射内部的Observable<T> 。当第一个内部Observable<T>出现时,该操作符会订阅它,并开始将T类型的事件推到下游。现在如果下一个 内部的Observable<T>出现了会发生什么?switchOnNext()通过取消订阅,丢弃掉第一个 Observable<T>,并切换到下一个(因此,名称)。换句话说,当我们有一个流,而该流内的数据也是流的话,switchOnNext()总是从最后一个内部流向下游转发事件,即使旧的流在继续转发新的事件。
这就是我们在《哈姆雷特》中引用的例子:
Random rnd = new Random();
Observable<Observable<String>> quotes = just(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w))
.flatMap(innerObs -> just(innerObs)
.delay(rnd.nextInt(5), SECONDS));
Observable
.switchOnNext(quotes)
.subscribe(System.out::println);
由于这个例子的随机性,可能的输出结果如下:
Jane: There
Jane: are
Jane: more
Alice: To
Alice: be
Alice: or
Alice: not
Alice: to
Bob: Though
Bob: this
Bob: be
Bob: madness
Bob: yet
Bob: there
Bob: is
Bob: method
Bob: in't
每个人开始讲话时都是0到4秒的随机延迟。在这个特别的回合中,首先是Jane的Observable<String>,但是在引用了很少的单词后,Alice的Observable<String>出现在了外部Observable中。在这一时间点上,switchOnNext()从jane的Observable<String>那里取消订阅,而我们再也听不到这句话的其余部分了。这个可以Observable<String>的东西被丢弃和忽略, switchOnNext() 只听Alice的Observable<String>的。但是,内部Observable<String>再次被打断,因为Bob又出现了。从理论上讲,如果它们没有重叠, switchOnNext() 可以在下一次出现之前完成所有来自内部Observables的事件。
那么如果我们是将每一个内部Observable中的事件延迟,而不是延迟Observable(变式B的那样),就像我们上面所展示的变式A那样,结果会发生什么呢?好吧,三个内部的Observable会同时出现在外面的Observable的,然后switchOnNext()只会订阅其中的一个。