在我们的示例应用程序中,我们现在必须通过电子邮件发送一张Ticket(票)列表。但我们必须记住以下几点要求:
这个列表可能很长。
发送电子邮件可能需要几毫秒甚至几秒。
在出现故障时,应用程序必须保持优雅的运行,但是关于tickets的报告最后不能交付。
最后一个要求很快排除了简单的tickets.forEach(this::sendEmail),因为它急切地抛出一个异常,并且不会继续使用尚未交付的tickets(票) 。异常实际上是类型系统的一个糟糕的后门,就像回调,当您想要以一种更健壮的方式管理它们时,回调就不那么友好了。这就是为什么RxJava将它们显式地作为特别的通知的原因,但要有耐心,我们会实现的。根据错误处理的要求,我们的代码看起来差不多是这样的:
List<Ticket> failures = new ArrayList<>();
for(Ticket ticket: tickets) {
try {
sendEmail(ticket);
} catch (Exception e) {
log.warn("Failed to send {}", ticket, e);
failures.add(ticket);
}
}
然而,前两个要求没有得到解决。我们没有理由一直按顺序发送邮件。传统上,我们可以使用ExecutorService池,将每个电子邮件作为单独的任务提交:
List<Pair<Ticket, Future<SmtpResponse>>> tasks = tickets
.stream()
.map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
.collect(toList());
List<Ticket> failures = tasks.stream()
.flatMap(pair -> {
try {
Future<SmtpResponse> future = pair.getRight();
future.get(1, TimeUnit.SECONDS);
return Stream.empty();
} catch (Exception e) {
Ticket ticket = pair.getLeft();
log.warn("Failed to send {}", ticket, e);
return Stream.of(ticket);
}
})
.collect(toList());
//------------------------------------
private Future<SmtpResponse> sendEmailAsync(Ticket ticket) {
return pool.submit(() -> sendEmail(ticket));
}
这里相当数量的代码是所有Java程序员都应该熟悉。然而,它似乎过于冗长和意外复杂。首先,我们遍历tickets(票)并将它们提交给线程池。确切地说,我们调用sendEmailAsync()帮助方法,它将 sendEmail()方法调用包装在 Callable<SmtpResponse>,并提交给一个线程池。甚至更精确的讲,Callable实例对象首先被放置在线程池前面的一个无限制的(默认的)队列中,如果不能及时处理任务,就无法及时地快速提交任务,从而导致响应流和背压工作(参见226页的“Backpressure”)。
因为稍后一旦我们失败,我们需要一个Ticket实例,我们必须跟踪哪个Future负责哪个Ticket,然后包装在Pair中。在实际的生产代码中,您应该考虑一个更有意义的、专用的容器,比如TicketAsyncTask值对象。我们收集所有这样的Pair对象,然后进行下一次迭代。此时,线程池已经并发的运行多个sendEmail()调用,这正是我们的目标。第二个循环遍历所有的Future,并试图通过阻塞(get())和等待完成,来结束它们。如果get()成功返回,我们将跳过这样的Ticket。但是,如果有异常,我们返回与这个任务相关的Ticket实例,我们知道它失败了,我们希望稍后报告它。flatMap()允许我们返回零或一个元素(或任何数字),与Stream.map()相反,后者总是需要一个元素。
你可能想知道为什么我们需要两个循环,而不是一个这样的循环:
//WARNING: code is sequential despite utilizing thread pool
List<Ticket> failures = tickets
.stream()
.map(ticket -> Pair.of(ticket, sendEmailAsync(ticket)))
.flatMap(pair -> {
//...
})
.collect(toList());
这是一个非常有趣的bug,如果您不理解Java 8中的流是如何工作的,那么就很难找到它。因为Stream(流)——就像Observable 那样——是惰性的,他们只在请求终端操作的时候(例如,终端操作包括collect(toList())),才会来评估底层的集合。这意味着这个启动后台任务的map()操作不会立即在所有票据上执行;相反,它一次只执行一个操作,另一个方法是使用flatMap()操作。此外,我们真的开始了一个Future,并阻塞等待它,然后开始第二个Future,阻塞等待它,诸如此类。需要一个中间集合来强制评估,挺不是因为代码清晰度和可读性。毕竟,List<Pair<Ticket, Future<SmtpResponse>>> 确实不是更具可读性。
这是大量的工作,而且错误的可能性很高,因此开发人员不愿意每天使用并发代码也就不足为奇了。当有一个异步任务的池,并且我们希望在它们完成时处理它们的时候,JDK的鲜为人知的ExecutorCompletionService有时候就会被使用。此外,Java 8带来了CompletableFuture (见第193页上的“CompletableFuture和Streams”),它完全是响应式的和非阻塞的。但是RxJava如何在这里提供帮助呢?首先,假设发送电子邮件的API已经被改造为使用RxJava:
import static rx.Observable.fromCallable;
Observable<SmtpResponse> rxSendEmail(Ticket ticket) {
//unusual synchronous Observable
return fromCallable(() -> sendEmail())
}
没有涉及到并发,只是将sendEmail()封装在一个Observable内部。这是一种罕见的Observable;通常,您将在实现中使用subscribeOn(),因此默认情况下,Observable 是异步的。此时此刻,我们可以像以前一样迭代所有的tickets :
List<Ticket> failures = Observable.from(tickets)
.flatMap(ticket ->
rxSendEmail(ticket)
.flatMap(response -> Observable.<Ticket>empty())
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(err -> ticket))
.toList()
.toBlocking()
.single();
PS:Observable.ignoreElements()
在我们的示例中很容易看到内部的flatMap()忽略响应并返回空流。在这种情况下,flatMap()是过量的;ignoreElements()操作符要高效得多。ignoreElements()简单地忽略了所有已发出的值和转发onCompleted()或onError()通知。因为我们忽略了实际的响应,只处理错误,ignoreElements()在这里工作得很好。
我们所感兴趣的是在外面的flatMap()里。如果只是flatMap(this:rxSendEmail),代码就可以工作;然而,任何从rxSendEmail发出的故障都将终止整个流。但是我们想要“捕获”所有发出的错误并收集它们以供以后使用。我们对Stream.flatmap()使用类似的技巧:如果成功地释放了响应,我们将其转换为空的 Observable 。这基本上意味着我们丢弃了成功的tickets。但是,如果发生故障,我们返回一个引发异常的ticket。一个额外的doOnError()回调允许我们记录异常------当然,我们也可以将日志记录添加到onErrorReturn()操作符中,但是我发现这种关注点的分离功能更强。
为了保持与以前的实现兼容,我们将Observable转换为Observable<List<Ticket>>,BlockingObservable<List<Ticket>>,toBlocking()以及最终的List<Ticket>( single() ).有趣的是,甚至BlockingObservable 保持这惰性。一个oBlocking() 操作符它本身并没有通过订阅底层流来强制进行评估,而且它甚至不阻塞。订阅和由此而来的迭代,以及发送电子邮件被推迟到single()被调用。
请注意,如果我们用concatMap()替换了外部的flatMap()(参见97页的“Ways of Combining Streams: concat(), merge(), and switchOnNext()”,在第75页上的”Preserving Order Using concatMap()”,那么我们将遇到类似于JDK流中提到的类似的bug。相对于立即订阅所有内部流的flatMap()(或merge),concatMap(或concat)一个又一个的订阅内部Observable。如果没有订阅者订阅到Observable,它甚至都不会开始工作。
到目前为止,一个简单的for循环和try - catch被替换为不太可读和更复杂的Observable 。然而,要将我们的序列代码转换为多线程计算,我们只需添加一个额外的操作符:
Observable
.from(tickets)
.flatMap(ticket ->
rxSendEmail(ticket)
.ignoreElements()
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(err -> ticket)
.subscribeOn(Schedulers.io()))
它是非侵入性的,你可能很难发现它。一个额外的subscribeOn()操作符会导致每个单独的rxSendMail()在指定的Scheduler上(在本例中是io() )被执行。这是RxJava的优势之一;它对于线程提供了很大的开放性,默认为同步执行,但是允许无缝和几乎透明的多线程。当然,这并不意味着您可以安全地在任意位置注入调度器。但是至少这个API没有那么详细和高级。我们将在稍后140页的“RxJava多线程”中更详细地研究调度器,而现在你只需要记住 Observable默认是同步的,但是,我们可以很容易地改变这种情况,并将并发应用到出乎意料的地方。这在现有的遗留应用程序中特别有价值,您可以在无需太多麻烦的情况下进行优化。
如果您是从头开始实现Observables的,默认情况下,使它们异步化更合乎习惯。这意味着在rxSendEmail()中直接放置subscribeOn(),而不是外部。否则,您可能会使用另一层调度器来包装已经异步的流。当然,如果Observable 背后的的生产者已经是异步的,那么就更好了,因为您的流不绑定到任何特定的线程。此外,你应该尽可能晚点的订阅到Observable。,尤其是接近我们外部世界的web框架。这大大改变了你的习惯,您的整个业务逻辑是惰性的,直到有人真正想看到结果为止。