Rx的Observable是Iterable的异步对偶形式。“对偶”的含义是:Observable提供了一个Iterable的所有功能,除了数据的反向流:它是基于push而不是基于pull。下面的表格显示了基于push和pull类型的泛函:
根据表,不是由消费者在next()下pull数据,而是由生产者push到onNext(T)。成功的终止是通过 onCompleted()回调,而不是阻塞线程,直到所有项都被迭代。也不是将异常抛出到调用堆栈上,而是将错误作为事件发送到onError(Throwable)回调。
通俗而言:你从Iterable和Iterator中同步的pull数据的方式,完全可以通过Observable 和Observer ,以一种异步的基于push的形式爱完成。
例如,在Java 8中,Iterable可以通过Java .util. stream升级到具有函数组合功能的形式,像如下这样工作:
// Iterable<String> as Stream<String>
// that contains 75 strings
getDataFromLocalMemorySynchronously()
.skip(10)
.limit(5)
.map(s -> s + "_transformed")
.forEach(System.out::println)
这将从getDataFromLocalMemorySynchronously()方法中检索75字符串,并只取11-15这5个值,其他的全部忽略,ing将其映射为特殊格式的字符串,最终打印。
而RxJava的版本如下:
// Observable<String>
// that emits 75 strings
getDataFromNetworkAsynchronously()
.skip(10)
.take(5)
.map(s -> s + "_transformed")
.subscribe(System.out::println)
这将接收5个字符串(15被发射,但前10个被删除),然后取消订阅(忽略或停止将要发出的其余字符串)。它转换和打印字符串,就像前面的Iterable / Stream示例一样。
换句话说,Rx Observable允许用通过push推送的异步数据进行编程,就像环绕在Iterables 和Lists 周围的基于pull的Streams一样。