图2-1 响应式编程是关于数据的流动和对它的响应

响应式编程是一种面向数据流和数据传播的开发模型。在响应式编程中,刺激是在流中传递的数据,称为流(streams)。有许多方法可以实现一个响应式编程模型。在本报告中,我们将使用响应式拓展(Reactive Extensions) (http://reactivex.io/),其中的流称为observables,而消费者订阅这些observables的内容并对这些值作出响应/反应(见图2-1)。

为了减少这些概念抽象,让我们看一个使用RxJava的例子(https://github.com/ReactiveX/RxJava),他是实现响应性拓展的Java工具类。这些示例代码位于代码库的reactive-programming文件夹中。

observable.subscribe(
    data -> { // onNext
        System.out.println(data);
    },
    error -> { // onError
        error.printStackTrace();
    },
    () -> { // onComplete
        System.out.println("No more data");
    }
);

在这段代码中,代码标识的是观察(或者说是订阅)了一个Observable(注:你可以将它想象为Java8 中农stream放回的流对象,但是在RxJava中,他是一个观察者模式的实现),当值在流中传输时被,这个观察者就会的到通知。订阅者可以接收三种类型的事件。onNext在有新值时会被调用,当在流中发射一个Error或在某个阶段抛出Exception时调用onError。当到达流的末尾时,将调用onComplete回调,这在无限流中是不会发生的。RxJava包含一组操作符,用于生成、转换和协调Observables,例如map是将一个值转换为另一个值的映射,或者flatMap可以生产一个Observable和链接其他的异步操作。

// sensor is an unbound observable publishing values.
sensor
    // Groups values 10 by 10, and produces an observable
    // with these values.
    .window(10)
    // Compute the average on each group
    .flatMap(MathObservable::averageInteger)
    // Produce a json representation of the average
    .map(average -> "{'average': " + average + "}")
    .subscribe(
        data -> {
            System.out.println(data);
        },
        error -> {
            error.printStackTrace();
        }
);

RxJava v1.x定义了不同类型的流,如下所示:

  • Observable代表是有界或者无界的流,期望包含一系列的值。
  • Single是具有单一值的流,通常是操作的延迟结果,类似于futures 或promise。
  • Completable是没有值的流,但指示操作是完成还是失败?

RxJava 2

虽然最近已经发布了RxJava 2.x,该报告仍然使用了以前的版本(RxJava 1.x)。RxJava 2.x提供相似的概念。RxJava 2添加了两种新类型的流。Observable 是用于不支持背压的流,而Flowable则是一个支持背压的Observable。RxJava 2还引入了Maybe类型,该类型可以对可能有0或者1个条目或存在一个error的流进行建模。

我们可以用RxJava做什么?例如,我们可以描述异步操作的序列并编排它们。让我们假设您想下载一个文档,处理它,并上传它。下载和上传操作是异步的。为了开发这个序列,你可以使用如下的方法:

// Asynchronous task downloading a document
Future<String> downloadTask = download();
// Create a single completed when the document is downloaded.
Single.from(downloadTask)
    // Process the content
    .map(content -> process(content))
    // Upload the document, this asynchronous operation
    // just indicates its successful completion or failure.
    .flatMapCompletable(payload -> upload(payload))
    .subscribe(
        () -> System.out.println("Document downloaded, updated
            and uploaded"),
        t -> t.printStackTrace()
);

您还可以编排异步任务。例如,要将两个异步操作的结果组合在一起,可以使用zip操作符组合不同流的值:

// Download two documents
Single<String> downloadTask1 = downloadFirstDocument();
Single<String> downloadTask2 = downloadSecondDocument();
// When both documents are downloaded, combine them
Single.zip(downloadTask1, downloadTask2,
    (doc1, doc2) -> doc1 + "\n" + doc2)
    .subscribe(
        (doc) -> System.out.println("Document combined: " + doc),
        t -> t.printStackTrace()
);

这些操作符的使用给了您超能力:您可以以声明式和优雅的方式协调异步任务和数据流。这与响应式微服务有什么关系?为了回答这个问题,我们来看看响应式系统。

Reactive Streams

你也许听说过响应式流(Reactive Streams)(http://www.reactivestreams.org/)。响应式流一种为异步流处理提供背压的标准。它提供了一组最小的接口和协议,这些接口和协议描述了实现非阻塞背压的异步数据流的操作和实体。它不定义操作流的操作符,主要用作互操作层。这一举措得到了Netflix、Lightbend和Red Hat等公司的支持。

results matching ""

    No results matching ""