实现自定义操作符比较麻烦,因为背压(参见第226页的“背压”)和订阅机制需要考虑。因此,尽量通过现有的操作实现你的需求,而不是创造你自己的操作符。内置的运算符都是经过测试和验证了的。但是,如果没有一个提供的操作符可以解决你的问题的话,lift()元操作符将会有所帮助。compose()只有在对将现有的操作符组合在一起有用。另一方面,通过lift(),您可以实现几乎任何操作符,从而改变上游事件的流。
鉴于compose()可以转换Observables,lift()允许转换Subscribers。让我们回顾一下我们在第35页“Mastering Observable.create()”中所学到的知识。当您subscribe() 到一个Observable时,Subscriber 实例会将将Observable的回调包装起来,并导致Observable的create()方法被调用,并以订阅者作为参数。因此,每次我们订阅时,Subscriber都要穿过所有操作符访问原始 Observable。显然,在Observable 和subscribe()之间可以有任意数量的操作符,他们改变下游的事件,如下所示:
Observable
.range(1, 1000)
.filter(x -> x % 3 == 0)
.distinct()
.reduce((a, x) -> a + x)
.map(Integer::toHexString)
.subscribe(System.out::println);
但是,这里有一个有趣的事实:如果您查找RxJava的源代码并用操作符的方法体来替换操作符调用,这个相当复杂的运算符序列变得非常规则(注意reduce()是如何使用scan(). takelast(1). takelast(1)来实现的)。
Observable
.range(1, 1000)
.lift(new OperatorFilter<>(x -> x % 3 == 0))
.lift( OperatorDistinct.<Integer>instance())
.lift(new OperatorScan<>((Integer a, Integer x) -> a + x))
.lift( OperatorTakeLastOne.<Integer>instance())
.lift( OperatorSingle.<Integer>instance())
.lift(new OperatorMap<>(Integer::toHexString))
.subscribe(System.out::println);
几乎所有的运算符,不包括那些同时处理多个流的运算符(例如flatMap())都是通过lift()实现的。当我们的subscribe()在底最底下时候,将创建一个Subscriber<String>实例并将其传递给直接的前任。它可以是发射“true”事件的Observable<String> ,也可能是其他操作符的组合结果,在我们的例子中,是 map(Integer::toHexString). map() 自身并不会发射值,直到它受到一个Subscriber,说他想要这些事件。map() 做的(通过lift()操作符的帮助)只是透明的订阅到它的父级(在前面的例子是reduce()),然而它不能传递与自己接收的 Subscriber相同的 Subscriber,这是因为 subscribe()方法需要的是一个Observable<String>,而reduce()期待的订阅者是Subscriber<Integer>,毕竟,这就是map()在这里做的:将Integer转换为String。因此,map()操作符将创建一个新的虚拟Subscriber<Integer>,每次这个特殊的订阅者接收到任何东西时,它将为其应用Integer::toHexString函数并通知下游的Subscriber<String>
PS:作者的版本是1.1.6,所以新版本有了改动,学习底层运作即可
Looking under the hood of the map() operator
这基本上就是OperatorMap类正在做的事情:提供从下游(child)Subscriber<R>到 上游Subscriber<T> .的转换。这里是在RxJava中发现的真正实现,并有一些较小的可读性简化:
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<T, R> transformer;
public OperatorMap(Func1<T, R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<T> call(final Subscriber<R> child) {
return new Subscriber<T>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {
child.onNext(transformer.call(t));
} catch (Exception e) {
onError(e);
}
}
};
}
}
一个不寻常的细节是T和R泛型的颠倒顺序。map()操作符将从类型T的上游流转换为R类型的值。但是,操作符的责任是将Subscriber<R>(来自下游订阅)转换为Subscriber<T>(传递给上游Observable)。我们期望经由Subscribe<R>订阅,而map()操作符则用来对付Observable<T>的,所以它需要Subscriber<T>
确保您大致了解了上面的来自RxJava源代码的代码片段。了解map()是如何实现的(公认的最简单的操作符之一)将使您能够编写自己的操作符。每当你在流上 map()时候,实际上调用lift(),它需要一个OperatorMap的实例对象,该对象有一个transformer函数。该函数对T类型的上游事件进行操作,并返回R类型的下游事件。每当用户向您的操作符提供任何自定义function/transformation 时,请确保您捕捉到了所有未预料到的异常,并通过onError()方法将它们转发到下游。这还可以确保您从上游取消订阅,防止它发出进一步的事件。
请记住,在有人真正订阅之前,我们几乎没有创建一个新的Observable (lift(),与其他操作符一样,创建新的Observable ),并引用了底层的OperatorMap实例,而该实例将对我们的函数进行引用。但只有当有人真正订阅时,才能调用OperatorMap的call()函数。这个函数接收我们的Subscriber<String>(例如,包装System.out::println)。返回另一个 Subscriber<Integer>,这个操作符是用于订阅上游的,即上一个操作符。
这基本上是所有操作符的工作方式,包括内置的和已定义的。您接收到一个Subscriber 并返回另一个Subscriber ,并将其希望的内容增强并传递给下游订阅服务器。
Our first operator
这一次,我们想要实现一个操作符,它将发出每个奇数(1、3、5等)元素的toString()。最好用一些示例代码解释:
Observable<String> odd = Observable
.range(1, 9)
.lift(toStringOfOdd())
//Will emit: "1", "3", "5", "7" and "9" strings
您可以使用内置的操作符来实现相同的功能,我们只是为了教育目的而编写一个自定义操作符:
Observable
.range(1, 9)
.buffer(1, 2)
.concatMapIterable(x -> x)
.map(Object::toString);
buffer()将在第214页的“Buffering Events to a List”中介绍,暂时,您需要知道的是,buffer(1,2)将会将任何Observable<T>转换为 Observable<List<T>>,其中每个内部列表只有一个奇元素,跳过偶数元素。拥有类似于List(1)、List(3)...这样列表值的流,我们使用concatMapIterable()来重构一个平面流。但是为了学习经验,让我们实现一个单步执行的自定义操作符。自定义操作符可以位于以下两种状态之一:
它只接收奇数事件(1st,3st,5st..),然后将其应用toString()方法之后传递给下游
当接收到偶数事件时候直接扔掉
回合式的重复,奥座敷类似于这样:
<T> Observable.Operator<String, T> toStringOfOdd() {
return new Observable.Operator<String, T>() {
private boolean odd = true;
@Override
public Subscriber<? super T> call(Subscriber<? super String> child) {
return new Subscriber<T>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
if(odd) {
child.onNext(t.toString());
} else {
request(1);
}
odd = !odd;
}
};
}
};
}
request(1)调用将在稍后的237页的“Honoring the Requested Amount of Data”中介绍。现在,您可以这样理解:当一个Subscriber请求仅仅是事件的一个子集时,例如,只取前两个事件(take(2))------- RxJava只需要在内部通过调用request(2)来请求数据量。这个请求被传递到上游,我们只收到1和2。然而,我们删除掉了2(偶数),但我们有义务提供两个事件给下游。因此,我们必须请求一个额外的事件(request(1)),因此我们获取到了3。RxJava实现了一种非常复杂的机制,称为backpressure(背压),它允许订阅者只请求他们可以处理的事件的数量,从而保护他们不让生产者超过消费者。我们在226页的“Backpressure”对这个话题做了详细介绍。
PS:不幸的是,无论好坏,null是RxJava中的有效事件值; 换言之,Observable.just("A", null, "B")和其他流一样好。在设计自定义操作符以及应用操作符时,您需要考虑到这一点。但是,传递null通常被认为是nonidiomatic,而应该使用包装器值类型来代替。
您可能会遇到的另一个有趣的陷阱是,未能向i新的Subscriber提供子订阅者作为参数,例如这样:
<T> Observable.Operator<String, T> toStringOfOdd() {
//BROKEN
return child -> new Subscriber<T>() {
//...
}
}
Subscriber的无参数构造函数是对的,而且我们的操作符似乎也能工作。但让我们看看它是如何与无限流的结合的:
Observable
.range(1, 4)
.repeat()
.lift(toStringOfOdd())
.take(3)
.subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
我们构建一个无限的数字流(1、2、3、4、1、2、3,4…),应用我们的操作符(“1”、“3”、“1”、“3”…),只取前三个值。这是绝对正确的,绝不会失败;但是毕竟流式惰性的。但是,从新的Subscriber(child)构造器中移除掉child,我们的Observable在收取了3个值之后,却从未收到Completion通知,这是为什么呢??
take(3)操作符只请求前三个值,并希望unsubscribe()。不幸的是,取消订阅请求从来没有到达原始的流,它一直在生成值。更糟糕的是,这些值由我们的自定义操作符处理并传递给下游Subscriber (take(3)),而它已经不再监听了。除了执行细节之外,作为经验法则,在编写自己的操作符时,将下游Subscriber作为构造函数参数传递给新Subscriber 。一个无参数的构造函数很少被使用,而且你基本不可能会用到它。
这只是在编写自己的操作符时遇到的问题的冰山一角。幸运的是,我们总能够通过内置的机制实现我们想要实现的目标。