在企业应用程序中,我很少看到显式的并发。大多数情况下,单个请求是由单个线程处理的。相同的线程执行以下步骤:
接受TCP / IP连接
解析HTTP请求
调用controller或servlet
阻塞数据库调用 •
生成结果
响应编码(如JSON)
将原始字节推回客户端
当后台向数据库发出几个独立的请求时,这种分层模型会影响用户延迟。它们按顺序执行,而你可以很容易地将它们并行化。此外可伸缩性的影响。例如,在Tomcat中,在负责处理请求的执行器中默认有200个线程。这意味着我们不能处理超过200个并发连接。如果突然出现短时间的大流量,传入的连接就会排队,服务器响应的延迟也会更高。然而,这种情况不会永远持续下去,Tomcat最终会开始拒绝输入的流量。我们将在接下来的章节中使用大量的部分(参见第169页“使用Netty和RxNetty的非阻塞HTTP服务器”)如何处理这一令人尴尬的缺点。就目前而言,让我们继续使用传统架构。在单个线程中执行请求处理的每一步都有一些好处,例如改进缓存本地化,最小化同步开销。不幸的是,在经典的应用程序中,由于总延迟是每个层延迟的总和,一个不正常的组件可能会对总延迟产生负面影响。此外,有时有许多彼此独立的步骤,并且可以并发执行。例如,我们调用多个外部api或执行几个独立的SQL查询。
JDK完全支持并发性,尤其是在Java 5的Executor Service和Java 8的CompletableFuture下 。尽管如此,它并没有得到广泛的应用。例如,让我们看看以下没有并发性的程序:
Flight lookupFlight(String flightNo) {
//...
}
Passenger findPassenger(long id) {
//...
}
Ticket bookTicket(Flight flight, Passenger passenger) {
//...
}
SmtpResponse sendEmail(Ticket ticket) {
//...
}
客户端代码如下:
Flight flight = lookupFlight("LOT 783");
Passenger passenger = findPassenger(42);
Ticket ticket = bookTicket(flight, passenger);
sendEmail(ticket);
这是典型的阻塞式代码,类似于在许多应用程序中可以找到的代码。但是,如果从延迟的角度仔细观察,前面的代码片段有四个步骤;然而,前两个是相互独立的。只有第三步(bookTicket())需要lookupFlight()和findPassenger() 的结果。这里存在着利用并发性的明显机会。然而,很少有开发人员会沿着这条路走下去,因为它需要笨拙的线程池、Futures和回调。如果API已经与Rx兼容的话我们如何做呢?请记住,您可以简单地将阻塞、遗留代码封装起来,就像我们在本章开头所做的那样:
Observable<Flight> rxLookupFlight(String flightNo) {
return Observable.defer(() ->
Observable.just(lookupFlight(flightNo)));
}
Observable<Passenger> rxFindPassenger(long id) {
return Observable.defer(() ->
Observable.just(findPassenger(id)));
}
从语义上说,这些rx开头的方法与之前的方法做同样的事,而且方式也相同;换言之,它们在默认情况下都是阻塞的。从客户端的角度来看,除了详细的API,我们没有获取到任何东西。
Observable<Flight> flight = rxLookupFlight("LOT 783");
Observable<Passenger> passenger = rxFindPassenger(42);
Observable<Ticket> ticket =
flight.zipWith(passenger, (f, p) -> bookTicket(f, p));
ticket.subscribe(this::sendEmail);
传统的阻塞程序和使用Observable封装的方法都是以相同的方式来工作的。默认情况下它是l惰性的,但操作顺序基本上是相同的。首先,我们创建Observable<Flight>,正如您已经知道的,默认情况下没有做任何事情。除非有人明确要求Flight,否则这个Observable只是一个惰性的占位符。我们已经了解到这是一种很有价值的Cold类型的吗Observable。同样对于Observable<Passenger>也是如此,;我们有两个占位符,分别是Fight和Passenger类型的,目前还没有出现任何副作用。没有数据库查询或web服务调用。如果我们决定停止处理,不会有任何多余的工作执行了。
要继续使用bookTicket(),我们需要具体的Fight(航班)和Passenger(乘客)实例。通过使用toBlocking()操作符,可以很容易地阻塞这两个Observables 。但是,我们希望尽可能避免阻塞,以减少资源消耗(尤其是内存),允许更大的并发性。另一个糟糕的解决方案是在Fight的Observable和Passenger的Observable上调用subscribe()方法,以某种方式等待两个回调结束。当Observable是阻塞的时,它相当简单,但是如果回调异步出现,而你需要等待它们以同步一些全局状态,那么这很快就变成了一场噩梦。当然一个嵌套的subscribe()并不是一种习惯用法,通常,您需要的只是一个消息流对应单个订阅。因为只有一个线程,所以回调在JavaScript中很有意义。同时订阅多个Observable对象的惯用方法是zip和zipWith。您可能认为zip是一种连接两个独立数据流的方法。但是,更常见的情况是,zip仅仅是用来连接两个单条目的Observables。ob1.zip(ob2). subscribe(…),本质上是指当ob1和ob2完成时才会接收到一个事件(它们自己发出一个事件)。所以,当你看到zip的时候,更有可能的是,某人只是想在两个或多个在执行路径上forked的Observables上进行join连接(请与Java7的fork-join一起类比思考,翻译的比较拗口)。zip是一种异步等待两个或多个值的方法,不管哪一个值最后出现。
让我们回到flight.zipWith(passenger, this::bookTicket)(使用方法引用而不是显式lambda的简短语法,如代码示例中所示)。我保留所有类型信息而不是流畅地连接表达式的原因是,我希望您注意返回类型。flight.zipWith(passenger, this::bookTicket)只有在flight和passenger都完成时才会调用回调;它返回一个新的Observable ,您应该立即将其视为数据的惰性占位符。令人惊讶的是,此时此刻还没有开始计算。我们只是简单地封装了几个数据结构,但是没有触发任何行为。只要没有人订阅Observable<Ticket>,RxJava就不会运行任何后端代码。这是最后一个声明中最后发生的事情:ticket.subscribe()显式地请求Ticket。
PS:Where to Subscribe?
注意您在领域代码中看到的subscribe()的位置。通常,您的业务逻辑仅仅是将其组合在一起,并将它们返回到某种框架或脚手架层中。实际的订阅发生在背后的web框架或一些胶水代码。你自己调用subscribe()服务并不是一个坏习惯,但是尽量把它推出去。
要了解执行的流程,可以查看下面的内容。我们订阅了ticket(译者注:注意最后一行,我们是在ticket上调用的subscribe方法),因此RxJava必须透明地订阅fight(即Observable<Flight>)和passenger(即Observable<Passenger>) 。此时,真正的逻辑发生了。因为两个Observables都是Cold类型的,而且目前还没有涉及到并发,所以到fight(即Observable<Flight>)的第一个订阅在调用线程中调用lookupFlight()这个阻塞方法。当lookupFlight()完成时,RxJava可以订阅passenger(即Observable<Passenger>)。然而,它已经从同步的fight(即Observable<Flight>)中收到了一个Fight实例,rxFindPassenger()以阻塞的方式调用findPassenger(),并最终接受到了一个Passenger 实例。在这个节骨眼上,数据流返回到下游。Flight(航班)和Passenger(乘客)视同提供的lambda表达式进行结合,并将结果传递给 ticket.subscribe() 。
这听起来像很多工作,考虑到它的行为和工作本质上就像我们开始时的阻塞代码一样。但是现在,我们可以在不改变任何逻辑的情况下声明式的应用并发。如果我们的业务方法返回的是 Future<Flight>(或者是CompletableFuture<Flight>,这也一点没关系),我们将作出两项决定:
对lookupFlight()的底层调用早已经开始,并且无处可以惰性。我们不阻止这种方法,但已经开始工作执行了。
我们对并发性方面的没有任何控制,它是一个方法实现,它决定一个Future的任务是在线程池中调用的,还是每个请求的新线程,等等。
RxJava为用户提供了更多的控制。仅仅因为 Observable<Flight>是在不考虑并发性的情况下实现的,这并不意味着我们以后不能应用它。现实世界中总的Observables通常早已是异步的,但在极少数情况下,您必须将并发性添加到现有的Observable中。我们的API的使用者,而不是实现者,在Observable是同步的情况下,应该可以自由的选择线程机制。所有这些都是通过使用subscribeOn()操作符实现的:
Observable<Flight> flight =
rxLookupFlight("LOT 783").subscribeOn(Schedulers.io());
Observable<Passenger> passenger =
rxFindPassenger(42).subscribeOn(Schedulers.io());
在订阅之前的任何时候,我们都可以注入subscribeOn()操作符,并提供一个所谓的Scheduler(调度器)实例。在本例中,我使用了Schedulers.io()工厂方法,但是我们同样可以使用自定义的ExecutorService并快速将其与用Scheduler包装。当订阅发生时,传递给Observable . create()的lambda表达式将在提供的Scheduler中执行,而不是在客户机线程中执行(即调用订阅的线程)。现在还没有必要,但我们将在在141页的“What Is a Scheduler?”中深入研究Scheduler。目前,将Scheduler视为线程池即可。
调度器如何改变我们程序的运行时行为?请记住,zip()操作符订阅两个或多个Observables并等待进行结对(即元组)。当异步发生订阅时,所有上游Observables都可以并发地调用它们的底层的阻塞代码。如果您现在运行您的程序,那么当ticket.subscribe()被调用时候,lookupFlight()和findPassenger()将立即开始执行,并并发执行。然后,bookTicket()将在前面提到的Observables中最慢的那个Observable发射出一个值后立即被应用。
谈论慢度这个概念时候时,当给定的可观察值在指定的时间内不发出任何值时,您可以声明式地应用一个超时:
rxLookupFlight("LOT 783")
.subscribeOn(Schedulers.io())
.timeout(100, TimeUnit.MILLISECONDS)
与往常一样,在出现错误时,它们是向下传播而不是随意抛出的。因此,如果lookupFlight()方法的时间超过100毫秒,那么您将以TimeoutException而结束,而不是向每个订阅者发送的值。timeout()操作符将在第251页的“Timing Out When Events Do Not Occur”中详尽地解释。
假设您的API已经是基于Rx驱动的,那么我们最后有两种并行运行的方法。但是,我们在bookTicket()作弊了Ticket,它仍然返回Ticket,这意味着它是阻塞的。即使订票非常快,但还是值得像下面这样声明,只是为了让API更容易演变。演进可能意味着以完全非阻塞的方式添加或者使用并发(参见第5章)。记住,将非阻塞API转换为阻塞的API,可以通过调用toBlocking()简单的做到。而相反的情况往往是具有挑战性的,需要大量额外的资源。而而且,很难预测像rxBookTicket()这样的方法的演进,如果它们曾经接触过网络或文件系统,更不用说数据库了,那么用一个Observable来包装它,并为他提指示可能的延迟,这是非常值得做的:
Observable<Ticket> rxBookTicket(Flight flight, Passenger passenger) {
//...
}
但是现在zipWith()返回一个尴尬的 Observable<Observable<Ticket>>,导致代码无法编译通过。一个很好的经验法则是,无论何时您看到双包装类型(例如 Optional<Optional<...>>)那么一定是丢了flatMap()调用。这里也是这样。zipWith()采用一个元组事件,并应用一个将这些事件作为参数的函数,并将结果鸳鸯放入到下游Observable中。这就是为什么我们之前看到的是Observable<Ticket>
,但现在它是Observable<Observable<Ticket>> 的原因,这里的Observable<Ticket>(注意:这里说的是Observable<Observable<Ticket>>的事件类型Observable<Ticket>)是supplied函数的结果。两种方法可以解决这个问题。一种方法是使用从zipWith返回的中间元组:
import org.apache.commons.lang3.tuple.Pair;
Observable<Ticket> ticket = flight
.zipWith(passenger, (Flight f, Passenger p) -> Pair.of(f, p))
.flatMap(pair -> rxBookTicket(pair.getLeft(), pair.getRight()));
如果使用来自第三方库的显式Pair,那么方法引用实际上会起作用:Pair::of,但是我们再次决定,可见类型信息比保存几个键值更有价值。毕竟,我们读代码的时间比写代码的时间长得多。中间元组方法的替代方法是应用带有恒等式函数的 flatMap:
Observable<Ticket> ticket = flight
.zipWith(passenger, this::rxBookTicket)
.flatMap(obs -> obs);
这个obs -> obs的恒等式lambda表达式似乎没有做任何事情,如果那时一个map()操作符的话,那确实是这样。但是请记住,flatMap()将函数应用到Observable的每个值,因此这个函数以Observable<Ticket>作为参数。稍后,结果不会像map()一样直接放在结果流中。相反,返回值( Observable<T>类型)被“压平”化,导致结果是Observable<T>而不是 Observable<Observable<T>>
。在通过调度器处理时,flatMap()操作符变得更加强大。您可能会认为flatMap()仅仅是为了避免嵌套的Observable<Observable<...>>的问题,但它比这个更基本。
PS:Observable.subscribeOn() Use Cases
人们很容易认为subscribeOn()是RxJava中用于并发的合适工具。这个操作符是有效的,但是您几乎看不到subscribeOn()的使用(而且还经常被描述为observeOn())。在现实中,Observables来自于异步源,因此不需要自定义Scheduler。在这一章中,我们使用subscribeOn()来显式地展示如何对现有应用程序进行升级,从而有选择地使用响应式原则。但实际上,Scheduler和subscribeOn()是最后的武器,却不是常见的方式。