你没有义务要完全读完整个流,尤其是在处理Hot类型的无限的Observable时候。事实上,切分Observable,并只消费其中一个子集,这是一个很常见的做法。本节中的大多数操作符都有示例,除非它们遵循最小讶异原则。然而,像take或last这样的操作符太有用了,不能省略。以下是这些操作符的非详细列表:

take(n) and skip(n):

take(n)操作符将会在上游发射了前n事件后,便将其截断,然后取消订阅(如果上游没有n项,则提前完成)。而skip(n)是完全相反的;它丢弃了最初的n个元素,并上游发射的n + 1事件开始。两个操作符都是相当自由的:负数被视为零,超过Observable的大小并没有被视为错误:

Observable.range(1, 5).skip(3); // [4, 5]
Observable.range(1, 5).skip(5); // []

takeLast(n) and skipLast(n)

另一种自描述的操作符对。takeLast(n)只在流完成之前从流中发出最后的n个值。在内部,这个操作符必须保存最后一个n个值的缓冲区,当它收到Completion通知时,它会立即释放整个缓冲区。将takeLast()应用在无限流是没有意义的,因为它永远不会发出任何东西——流永远不会结束,所以没有最后的事件。另一方面,skipLast(n)发射除了最后n个事件之外的所有其他上游事件。在内部,skipLast()只能在从上游接收到第n + 1元素时才会发出第一个值,当它接收到n + 2时,就会发出第二个值,以此类推

Observable.range(1, 5).takeLast(2); // [4, 5]
Observable.range(1, 5).skipLast(2); // [1, 2, 3]

first() and last()

frist()和last()操作符可以通过take(1). single()和takeLast(1).single()来实现,因此,它们几乎可以描述它们的行为。额外的single()操作符确保下游的Observable只能发出一个值或异常。此外,first()和last()都有重载的版本。不是返回第一个/最后一个值,而是返回满足条件判断的第一个或者最后一个元素。

takeFirst(predicate)

takeFirst(predicate)操作符可以用filter(predicate).take()表示。这个和first(predicate)之间的惟一区别是,在缺少匹配值时,它不会因为NoSuchElementException而中断。

takeUntil(predicate) and takeWhile(predicate)

takeUntil(predicate)和takeWhile(predicate)是密切相关的。takeUntil()从源Observable发射值,但在发出的值与谓词匹配了的话,就会完成Observable并取消订阅。takeWhile()则反之,只要它们匹配给定的谓词,就会开始向下游推送事件。因此,惟一的区别是takeUntil()将会发射出与谓词所匹配的值,而takeWhile()则不会。这些操作符非常重要,因为它们提供了根据所发射的事件,有条件的判断是否需要取消对Observable的订阅。否则,操作符需要以某种方式与Subscription 实例进行交互(事例参考32页的“Controlling Listeners by Using Subscription and Subscriber<T>”),而当操作被调用的时候,这是不被允许的:

Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]

elementAt(n)

通过索引提取特定的条目是相当少见的,但是您可以使用内置的elementAt(n)操作符。它是很严格的,当索引太长超过上游数据的长度,或者索引是负数的话,它会导致发射一个IndexOutOfBoundsException。当然,它返回与上游相同T类型的Observable<T>

…OrDefault() operators

这一节中的许多操作符都是严格的,并且可能导致抛出异常—例如,当上游Observable是空的的时候会用first()。在这种情况下,RxJava有许多的……OrDefault操作符,可以以一个默认值替换异常。所有这些都是自解释的:elementAtOrDefault()、firstOrDefault()、lastOrDefault()和singleOrDefault()

count()

count()是一个有趣的操作符,它计算了上游Observable的事件的数量。顺便说一下,如果您需要知道上游Observable所发射的事件,有多少个是与给定的谓词匹配的话,filter(predict).count()是惯用的搭配方式。不要担心,所有的运算符都是惰性的,所以即使对于相当大的流也是可以的。显然,当流是无限的话,count()从不会发射值。您可以很容易的使用reduce()来实现'count():

Observable<Integer> size = Observable
    .just('A', 'B', 'C', 'D')
    .reduce(0, (sizeSoFar, ch) -> sizeSoFar + 1);

all(predicate) , exists(predicate) , and contains(value)

有时,确保给定的Observable所发射的事件都匹配某些谓词,有时这是一个很有用的功能。当上游完成时,并且所有值都与谓词匹配,all(predicate)操作符将会发出true。但是,一旦发现第一个不满足谓词的值,就会立即发出false。exists(exists)与all()完全相反;当第一个与谓词匹配的值被发现时,它会发射出true的信息,但如果上游完成了,却没有一个满足谓词的事件,它会发射false。通常,在exists()中我们的谓词仅将上游的值与一些常量进行比较。在这种情况下,可以使用contains()操作符:

Observable<Integer> numbers = Observable.range(1, 5);
numbers.all(x -> x != 4); // [false]
numbers.exists(x -> x == 4); // [true]
numbers.contains(4); // [true]

results matching ""

    No results matching ""