图2-1 响应式编程是关于数据的流动和对它的响应
响应式编程是一种面向数据流和数据传播的开发模型。在响应式编程中,刺激是在流中传递的数据,称为流(streams)。有许多方法可以实现一个响应式编程模型。在本报告中,我们将使用响应式拓展(Reactive Extensions) (http://reactivex.io/),其中的流称为observables,而消费者订阅这些observables的内容并对这些值作出响应/反应(见图2-1)。
为了减少这些概念抽象,让我们看一个使用RxJava的例子(https://github.com/ReactiveX/RxJava),他是实现响应性拓展的Java工具类。这些示例代码位于代码库的reactive-programming文件夹中。
observable.subscribe(
data -> { // onNext
System.out.println(data);
},
error -> { // onError
error.printStackTrace();
},
() -> { // onComplete
System.out.println("No more data");
}
);
在这段代码中,代码标识的是观察(或者说是订阅)了一个Observable(注:你可以将它想象为Java8 中农stream放回的流对象,但是在RxJava中,他是一个观察者模式的实现),当值在流中传输时被,这个观察者就会的到通知。订阅者可以接收三种类型的事件。onNext在有新值时会被调用,当在流中发射一个Error或在某个阶段抛出Exception时调用onError。当到达流的末尾时,将调用onComplete回调,这在无限流中是不会发生的。RxJava包含一组操作符,用于生成、转换和协调Observables,例如map是将一个值转换为另一个值的映射,或者flatMap可以生产一个Observable和链接其他的异步操作。
// sensor is an unbound observable publishing values.
sensor
// Groups values 10 by 10, and produces an observable
// with these values.
.window(10)
// Compute the average on each group
.flatMap(MathObservable::averageInteger)
// Produce a json representation of the average
.map(average -> "{'average': " + average + "}")
.subscribe(
data -> {
System.out.println(data);
},
error -> {
error.printStackTrace();
}
);
RxJava v1.x定义了不同类型的流,如下所示:
- Observable代表是有界或者无界的流,期望包含一系列的值。
- Single是具有单一值的流,通常是操作的延迟结果,类似于futures 或promise。
- Completable是没有值的流,但指示操作是完成还是失败?
RxJava 2
虽然最近已经发布了RxJava 2.x,该报告仍然使用了以前的版本(RxJava 1.x)。RxJava 2.x提供相似的概念。RxJava 2添加了两种新类型的流。Observable 是用于不支持背压的流,而Flowable则是一个支持背压的Observable。RxJava 2还引入了Maybe类型,该类型可以对可能有0或者1个条目或存在一个error的流进行建模。
我们可以用RxJava做什么?例如,我们可以描述异步操作的序列并编排它们。让我们假设您想下载一个文档,处理它,并上传它。下载和上传操作是异步的。为了开发这个序列,你可以使用如下的方法:
// Asynchronous task downloading a document
Future<String> downloadTask = download();
// Create a single completed when the document is downloaded.
Single.from(downloadTask)
// Process the content
.map(content -> process(content))
// Upload the document, this asynchronous operation
// just indicates its successful completion or failure.
.flatMapCompletable(payload -> upload(payload))
.subscribe(
() -> System.out.println("Document downloaded, updated
and uploaded"),
t -> t.printStackTrace()
);
您还可以编排异步任务。例如,要将两个异步操作的结果组合在一起,可以使用zip操作符组合不同流的值:
// Download two documents
Single<String> downloadTask1 = downloadFirstDocument();
Single<String> downloadTask2 = downloadSecondDocument();
// When both documents are downloaded, combine them
Single.zip(downloadTask1, downloadTask2,
(doc1, doc2) -> doc1 + "\n" + doc2)
.subscribe(
(doc) -> System.out.println("Document combined: " + doc),
t -> t.printStackTrace()
);
这些操作符的使用给了您超能力:您可以以声明式和优雅的方式协调异步任务和数据流。这与响应式微服务有什么关系?为了回答这个问题,我们来看看响应式系统。
Reactive Streams
你也许听说过响应式流(Reactive Streams)(http://www.reactivestreams.org/)。响应式流一种为异步流处理提供背压的标准。它提供了一组最小的接口和协议,这些接口和协议描述了实现非阻塞背压的异步数据流的操作和实体。它不定义操作流的操作符,主要用作互操作层。这一举措得到了Netflix、Lightbend和Red Hat等公司的支持。