让我们改变一下,讨论来自标准JDK的并行流。在Java 8中,当您转换一个中等大小的对象集合时,您可以用可选的并行性对它们进行声明式的转换:
List<Person> people = //...
List<String> sorted = people
.parallelStream()
.filter(p -> p.getAge() >= 18)
.map(Person::getFirstName)
.sorted(Comparator.comparing(String::toLowerCase))
.collect(toList());
请注意前面代码片段中的parallelStream()而不是常规stream()。通过使用parallelStream(),我们要求像collect()这样的终端操作并行执行,而不是按顺序执行。当然,这不会对结果产生任何影响,但应该会更快。在引擎盖下,parallelStream()所做的是将一个输入集合分割成多个块,并并行地调用每个操作,然后以分而治之的精神思想将结果合并成一个。
许多运算符很容易并行化——例如,map()和filter()-其他的操作比较困难(像sorted()),因为在分别对每个块进行排序后,我们必须将它们合并在一起,在排序的情况下意味着合并两个排序的序列。如果没有进一步的假设,一些操作在本质上是很难或不可能并行化的。例如,reduce()只能在累积函数是组合的的情况下执行。
PS:Same Results?
在串行stream()和parallelStream()中,有一些运算符可以产生不同的结果。例如,findFirst()操作符返回在流中遇到的第一个元素。另一方面,findAny()操作符的存在似乎也是一样的。但是,虽然findFirst()总是返回来自流的第一个值,但在并行流上执行时,findAny()可以自由返回任何值。
当在findFirst()或findAny()之前使用filter()操作符时,就会出现这种情况。parallelStream()的执行可以自由地将输入流分成两半,并分别在每个半部分上并行执行过滤。如果过滤后半部分产生任何匹配值,则findAny()将返回它,即使在前半部分中也存在一些匹配值。findFirst()保证在全局上返回第一个匹配值,因此必须等待两个部分的过滤结果。这两种方法都有其优点,应该加以利用。
理想情况下,将个四核CPU的的机器上考虑阿姆达尔定律,我们可以预期执行速度将提高4倍。但是并行流也有其缺点。首先,对于小的流和短的转换管道,上下文切换的代价可能是重要的,因为并行流比串行流要慢。过于细粒度的并发性问题也可能发生在RxJava中,因此它通过调度器来支持声明性并发(参见141页的“What Is a Scheduler?”)。并行流的情况是不同的。
有没有想过为什么这个框架被称为并行而不是并发流呢?并行流仅为CPU密集型工作而设计,并且有一个硬编码的线程池(确切地说是ForkJoinPool),它与我们拥有的cpu数量一致。这个池是静态和全局的,它处于ForkJoinPool.commonPool()下。在JVM中,每个并行流以及一些CompletableFuture的回调都共享这个ForkJoinPool。整个JVM中的所有并行流(如果您将WAR文件部署到应用服务器上,那么即表示在多个应用程序中)共享同一个小的线程池。这通常很好,因为并行流是为并行任务设计的,而这些任务实际上需要100%的CPU时间。因此,如果并发的调用多个并行流,那么无论如何,它们都要争用CPU。
但是设想一个自私的应用程序在并行流中运行I/O操作:
//DON'T DO THIS
people
.parallelStream()
.forEach(this::publishOverJms);
publishOverJms()向流中的每个person 发送一个JMS消息。我们特意选择了JMS发送。它看起来非常快,但是由于交付的保证,JMS发送很可能会接触网络(通知消息代理)或磁盘(在本地持久化消息)。这微小的I / O延时足以持有宝贵的ForkJoinPool.commonPool()线程过长的时间。即使这个程序没有使用CPU,JVM中也没有其他代码可以执行并行流。现在想象一下,如果这不是通过JMS发送,而是通过web服务检索数据或操作昂贵的数据库查询。parallelStream()只能用于完全cpu绑定的任务,否则JVM的性能会受到很大影响。
这并不意味着并行流是不好的。然而,由于为其提供的是固定的线程池,所以它们的使用非常有限。当然,来自JDK的并行流并不能替代Observable.flatMap()或其他并发机制。并行流在执行时效果最好,…在并行执行的时候。但是并发任务不需要100%的CPU时间——例如,在网络或磁盘上被阻塞——最好使用其他机制。
了解了流的限制,我们就可以比较futures 和RxJava,以了解它们最适合的位置。