RxJava技术的全部意义在于它支持push(推送),因此Observable和相关的Observer的类型签名中支持被推送的事件类型(即泛型)。这通常是伴随着异步的,在下一节将讨论。但是,Observable类型也支持异步反馈通道(有时也称为async-pull或reactive-pull),作为在异步系统中控制流量或背压的一种方法。本章后面的部分将讨论流量控制以及该机制如何自适应的。

为了通过push来支持接收事件,可以通过订阅连接一个Observable / Observer对。Observable表示数据流,并可以被Observer(观察者)订阅:

interface Observable<T> {
Subscription subscribe(Observer s)
}

在订阅中,观察者可以有三种类型的事件推到它:

  • 通过onNext()函数传递的的数据

  • 通过onError()函数传递来的Error(exceptions或throwables)

  • 通过onCompleted()函数传递的流完成事件

interface Observer<T> {
void onNext(T t)
void onError(Throwable t)
void onCompleted()
}

onNext()方法可能永远不会被调用,也可能会被调用一次,多次,或者无限多次。onError() 和 onCompleted() 都是终端事件,意思就是其中的任何一个只能被调用一次。当终端事件被调用时候,Observable流变为已完成,不再可以发送其他事件。如果流是无限的,而且也不会发生失败的情况,那么终端事件可能从不会发生。

正如在211页的“Flow Control”一节中以及226页“Backpressure”一节中讲的,还有另外一种类型的签名,该签名允许以pull的方式交互:

interface Producer {
void request(long n)
}

这是与一个称为 Subscriber更高级的Observer一起使用的(更多的细节请参考32页的“Controlling Listeners by Using Subscription and Subscriber<T>”)

interface Subscriber<T> implements Observer<T>, Subscription {
void onNext(T t)
void onError(Throwable t)
void onCompleted()
...
void unsubscribe()
void setProducer(Producer p)
}

作为 Subscription接口的一部分,unsubcribe函数允许订阅方从Observable的流中取消订阅。 setProducer函数自己Producer类型诶用于在用于流量控制的生产者和消费者之间形成双向交互渠道。

results matching ""

    No results matching ""