大多数Java Api,如JDBC、Java.io、servlets 3以及专有的解决方案都是阻塞的。这意味着客户端线程必须等待,无论是结果还是副作用。然而,有一些用例是固有的异步的;例如,从一些外部源推送来事件。您可以按照以下方式构建block-streaming API:

while(true) {
    Event event = blockWaitingForNewEvent();
    doSomethingWith(event);
}

幸运的是,当一个领域天生具有异步性时,您很可能会发现一些基于回调用的API,例如,这在JavaScript中非常流行。这些api将接受某种形式的回调,通常是一个带有许多方法的接口,您可以通过实现这些方法来通知您各种事件。这种API最显著的例子就是几乎所有的图形用户界面库:例如Swing。当使用诸如onClick()或onKeyUp()这样的侦听器时,回调肯定是不可避免的。如果你在这样的环境中工作过,你一定很熟悉“callback hell(回调地狱)”这个术语。回调有相互嵌套的倾向,因此协调多个回调实际上是不可能的。下面是嵌套四次的回调示例:

button.setOnClickListener(view -> {   //第一层callback
    MyApi.asyncRequest(response -> {    //第二层callback
        Thread thread = new Thread(() -> {//第三层
            int year = datePicker.getYear();
            runOnUiThread(() -> {      //第四层
                button.setEnabled(false);
                button.setText("" + year);
            });
        });
        thread.setDaemon(true);
        thread.start();
    });
});

最简单的需求,比如当两个回调互相调用时的响应,就变成了一个噩梦,而且还会被多线程所阻碍。在本节中,我们将把基于callbackapi的API重构为RxJava,并提供所带来的好处,例如控制线程、生命周期和清理。

我最喜欢的流的例子之一是Twitter上的状态更新,也就是tweets(推文)。每秒有几千个用户更新。其中很多都伴随着地理、语言和其他元数据。为了这个练习的目的,我们将使用开源的Twitter4J库,可以使用基于回调的API来推送新tweets的子集。本章不打算解释Twitter4J是如何工作的,也不是为了提供健壮的示例。Twitter4J被选为一个使用带有有趣域的回调的API的好例子。实时读取tweets 的最简单的工作示例可能是这样的:

import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new twitter4j.StatusListener() {
        @Override
        public void onStatus(Status status) {
            log.info("Status: {}", status);
        }
        @Override
        public void onException(Exception ex) {
            log.error("Error callback", ex);
        }
        //other callbacks
    });
    twitterStream.sample();
    TimeUnit.SECONDS.sleep(10);
    twitterStream.shutdown();

调用twitterStream.sample()启动一个后台线程,该线程登录到Twitter并等待新消息。每次出现tweet(新的推文)时,就会执行onStatus回调。执行可以在线程之间跳转,因此我们不能再依赖于抛出异常。而是使用onException()通知。在休眠10秒钟后,我们关闭流,清理所有底层资源,比如HTTP连接或线程。

总的来说,它看起来没有那么糟糕,问题是这个程序没有做任何事情。在现实生活中,你可能会以某种方式处理每个状态信息(tweet)。例如,将它保存到一个数据库中或提供机器学习算法。从技术上讲,您可以将该逻辑放入回调中,但是这将对基础设施调用和领域逻辑的连接起来了。简单的委托给一个单独的类是更好的,但是不幸的是这个类不能重用。我们真正想要的是技术领域(从HTTP连接中获取数据)和解释输入数据的业务领域之间的完全隔离。所以我们建立了第二层回调:

void consume(
                Consumer<Status> onStatus,
                Consumer<Exception> onException) {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    onStatus.accept(status);
                }
                @Override
                public void onException(Exception ex) {
                    onException.accept(ex);
                }
                //other callbacks
    });
    twitterStream.sample();
}

通过添加这个额外的抽象级别,我们现在可以以各种方式重用consume()方法。想象一下,如果不是日志记录,而是持久性、分析或欺诈检测:

consume(
    status -> log.info("Status: {}", status),
    ex -> log.error("Error callback", ex)
);

但是我们只是把问题移到了层次结构中。如果我们想计算每秒的tweet数量呢?还或者需求只是消费前5个tweet呢?如果我们想要有多个监听器呢?在这些情况下,每一种情况都会打开一个新的HTTP连接。最后但同样重要的是,这个API不允许在我们完成时取消订阅,这可能会导致资源泄漏。我们希望您意识到我们正朝着一个Rx驱动的API前进。我们可以返回一个Observable<Status>,每个订阅者想要什么就订阅什么,而不是把回调传递到可以执行的地方。但是,请记住,下面的实现仍然为每个订阅者打开一个新的网络连接

Observable<Status> observe() {
    return Observable.create(subscriber -> {
        TwitterStream twitterStream =
            new TwitterStreamFactory().getInstance();
        twitterStream.addListener(new StatusListener() {
            @Override
            public void onStatus(Status status) {
                subscriber.onNext(status);
            }
            @Override
            public void onException(Exception ex) {
                subscriber.onError(ex);
            }
            //other callbacks
        });
        //subscriber.add(Subscriptions.create(twitterStream::shutdown));
    });
}

在这一点上,我们可以简单地调用observe(),它只创建一个Observable并且不与外部服务器联系的。我们了解到,除非有人真正订阅,否则create()的内容不会被执行。:

observe().subscribe(
    status -> log.info("Status: {}", status),
    ex -> log.error("Error callback", ex)
);

与consume(…)相比,这里最大的不同是,我们不被迫将回调作为参数给observe()方法。相反,我们可以返回Observable<Status> ,传递它,将它存储在某个地方,并可以随时随地使用它,我们觉得它是必需的。我们也可以用其他Observables来组成这个Observable ,这就是第三章的内容。我们没有讨论的一个重要方面是资源清理。当有人取消订阅时,我们应该关闭TwitterStream以避免资源泄露。我们已经知道了两种技术;让我们先用简单的方法:

@Override
public void onStatus(Status status) {
    if (subscriber.isUnsubscribed()) {
        twitterStream.shutdown();
    } else {
        subscriber.onNext(status);
    }
}
@Override
public void onException(Exception ex) {
    if (subscriber.isUnsubscribed()) {
        twitterStream.shutdown();
    } else {
        subscriber.onError(ex);
    }
}

当有定于这只想接收流的一部分时候订,我们的Observable 将确保清理资源。我们知道第二种技术来实现清理,不需要等待上游事件。当订阅者取消订阅时,我们立即调用shutdown(),而不是等待下一条tweet来触发清理行为(最后一行):

twitterStream.addListener(new StatusListener() {
    //callbacks...
});
twitterStream.sample();
subscriber.add(Subscriptions.create(twitterStream::shutdown));

有趣的是,这个Observable 模糊了Cold和Hot的区别。一方面,它代表的是不受我们控制的外部事件(Hot行为)。另一方面,在我们实际subscribe()之前,事件不会开始流(因为没有底层HTTP连接)到我们的系统。我们忘记的另一个副作用仍然在悄悄出现:每个新subscribe()将启动一个新的后台线程,创建一个到外部系统的新连接。相同的Observable<Status>的实例应该可以在许多订阅者中重用,而且由于Observable<Status>是惰性的,所以从技术上讲,您应该能够在启动时调用observe()并将其保存在一些单例中。但是,对于每个订阅者,当前的实现只是打开一个新的连接,有效地从网络中多次获取相同的数据。我们当然希为该流注册多个订阅者,但是没有理由每个订阅者都应该独立地获取相同的数据。我们真正想要的是一个 pub-sub(发布-订阅)行为,在一个发布者(外部系统)中,将数据交付给多个订阅者。我们现在将探讨这个问题的一些解决方案。

results matching ""

    No results matching ""