高效使用并行流
一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。
- 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
- 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流( IntStream 、LongStream 、 DoubleStream )来避免这种操作,但凡有可能都应该用这些流。
- 有些操作本身在并行流上的性能就比顺序流差。特别是 limit 和 findFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如, findAny 会比 findFirst 性能好,因为它不一定要按顺序来执行。你总是可以调用 unordered 方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit 可能会比单个有序流(比如数据源是一个 List )更高效。
- 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
- 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
- 要考虑流背后的数据结构是否易于分解。例如, ArrayList 的拆分效率比 LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用 range 工厂方法创建的原始类型流也可以快速分解。
- 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个 SIZED 流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
- 还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。
分支/合并框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。首先来看看如何定义任务和子任务。
使用 RecursiveTask
要把任务提交到这个池,必须创建 RecursiveTask 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型(当然它可能会更新其他非局部机构)。要定义 RecursiveTask, 只需实现它唯一的抽象方法compute :
protected abstract R compute(); 复制代码复制代码
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:
if (任务足够小或不可分) { 顺序计算该任务 } else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果 } 复制代码复制代码
一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做出这一决定。
你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并框架的实际例子,还以前面的例子为基础,让我们试着用这个框架为一个数字范围(这里用一个long[] 数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码清单中的ForkJoinSumCalculator 。
用分支/合并框架执行并行求和:
public class ForkJoinSumCalculator extends RecursiveTask<Long> { /** * 不再将任务分解为子任务的数组大小 */ public static final long THRESHOLD = 10_000; /** * 要求和的数组 */ private final long[] numbers; /** * 子任务处理的数组的起始和终止位置 */ private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { // 该任务负责求和的部分的大小 int length = end - start; // 如果大小小于或等于阈值,顺序计算结果 if (length <= THRESHOLD) { return computeSequentially(); } // 创建一个子任务来为数组的前一半求和 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); // 利用另一个ForkJoinPool线程异步执行新创建的子任务 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); // 同步执行第二个子任务,有可能允许进一步递归划分 Long rightResult = rightTask.compute(); // 读取第一个子任务的结果,如果尚未完成就等待 Long leftResult = leftTask.join(); // 该任务的结果是两个子任务结果的组合 return leftResult + rightResult; } private Long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } 复制代码复制代码
现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); } 复制代码复制代码
这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递 ForkJoinSumCalculator 的公共构造函数。最后,你创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。
请注意在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
运行 ForkJoinSumCalculator
当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。
你可以再用一次本章开始时写的测试框架,来看看显式使用分支/合并框架的求和方法的性能:
System.out.println("ForkJoin sum done in: " + measurePerf( ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs"); 复制代码复制代码
它生成以下输出:
ForkJoin sum done in: 41 msecs 复制代码复制代码
这个性能看起来比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个long[] ,之后才能在 ForkJoinSumCalculator 任务中使用它。
使用分支/合并框架的最佳做法
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的最佳做法。
- 对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
- 不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。
- 对子任务调用 fork 方法可以把它排进 ForkJoinPool 。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用 compute 低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
- 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支合并计算上就不行了,因为调用 compute的线程并不是概念上的调用方,后者是调用 fork 的那个。
- 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)。
对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。
工作窃取
在 ForkJoinSumCalculator 的例子中,我们决定在要求和的数组中最多包含10 000个项目时就不再创建子任务了。这个选择是很随意的,但大多数情况下也很难找到一个好的启发式方法来确定它,只能试几个不同的值来尝试优化它。在我们的测试案例中,我们先用了一个有1000万项目的数组,意味着 ForkJoinSumCalculator 至少会分出1000个子任务来。这似乎有点浪费资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所有的任务都受CPU约束,预计所花的时间也差不多。
但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或是需要和外部服务协调执行。
分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。
一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。
现在你应该清楚流如何使用分支/合并框架来并行处理它的项目了,不过还有一点没有讲。本节中我们分析了一个例子,你明确地指定了将数字数组拆分成多个任务的逻辑。但是,使用本章前面讲的并行流时就用不着这么做了,这就意味着,肯定有一种自动机制来为你拆分流。这种新的自动机制称为 Spliterator ,我们会在下一节中讨论。
Spliterator
Spliterator 是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitableiterator)。和 Iterator 一样, Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。虽然在实践中可能用不着自己开发 Spliterator ,但了解一下它的实现方式会让你对并行流的工作原理有更深入的了解。Java 8已经为集合框架中包含的所有数据结构提供了一个默认的 Spliterator 实现。集合实现了 Spliterator 接口,接口提供了一个 spliterator 方法。这个接口定义了若干方法,如下面的代码清单所示。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); } 复制代码复制代码
与往常一样, T 是 Spliterator 遍历的元素的类型。 tryAdvance 方法的行为类似于普通的Iterator ,因为它会按顺序一个一个使用 Spliterator 中的元素,并且如果还有其他元素要遍历就返回 true 。但 trySplit 是专为 Spliterator 接口设计的,因为它可以把一些元素划出去分给第二个 Spliterator (由该方法返回),让它们两个并行处理。 Spliterator 还可通过estimateSize 方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
重要的是,要了解这个拆分过程在内部是如何执行的,以便在需要时能够掌控它。因此,我们会在下一节中详细地分析它。
拆分过程
将 Stream 拆分成多个部分的算法是一个递归过程。第一步是对第一个Spliterator 调用 trySplit ,生成第二个 Spliterator 。第二步对这两个 Spliterator 调用trysplit ,这样总共就有了四个 Spliterator 。这个框架不断对 Spliterator 调用 trySplit直到它返回 null ,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的 Spliterator 在调用 trySplit 时都返回了 null 。
这个拆分过程也受 Spliterator 本身的特性影响,而特性是通过 characteristics 方法声明的。
实现你自己的 Spliterator
让我们来看一个可能需要你自己实现 Spliterator 的实际例子。我们要开发一个简单的方法来数数一个 String 中的单词数。这个方法的一个迭代版本可以写成下面的样子。
public static int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = Character.isWhitespace(c); } } return counter; } 复制代码复制代码
让我们把这个方法用在但丁的《神曲》的《地狱篇》的第一句话上:
public static final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " che la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words"); 复制代码复制代码
请注意,我们在句子里添加了一些额外的随机空格,以演示这个迭代实现即使在两个词之间存在多个空格时也能正常工作。正如我们所料,这段代码将打印以下内容:
Found 19 words 复制代码复制代码
理想情况下,你会想要用更为函数式的风格来实现它,因为就像我们前面说过的,这样你就可以用并行 Stream 来并行化这个过程,而无需显式地处理线程和同步问题。
- 以函数式风格重写单词计数器
首先你需要把 String 转换成一个流。不幸的是,原始类型的流仅限于 int 、 long 和 double , 所以你只能用 Stream :
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); 复制代码复制代码
你可以对这个流做归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个 int用来计算到目前为止数过的字数,还有一个 boolean 用来记得上一个遇到的 Character 是不是空格。因为Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不需要包装对象),所以你必须创建一个新类 WordCounter 来把这个状态封装起来,如下所示。
private static class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } } 复制代码复制代码
在这个列表中, accumulate 方法定义了如何更改 WordCounter 的状态,或更确切地说是用哪个状态来建立新的 WordCounter ,因为这个类是不可变的。每次遍历到 Stream 中的一个新的Character 时,就会调用 accumulate 方法。具体来说,就像 countWordsIteratively 方法一样,当上一个字符是空格,新字符不是空格时,计数器就加一。
调用第二个方法 combine 时,会对作用于 Character 流的两个不同子部分的两个WordCounter 的部分结果进行汇总,也就是把两个 WordCounter 内部的计数器加起来。
private static int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } 复制代码复制代码
现在你就可以试一试这个方法,给它由包含但丁的《神曲》中《地狱篇》第一句的 String创建的流:
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words"); 复制代码复制代码
你可以和迭代版本比较一下输出:
Found 19 words 复制代码复制代码
到现在为止都很好,但我们以函数式实现 WordCounter 的主要原因之一就是能轻松地并行处理,让我们来看看具体是如何实现的。
- 让 WordCounter 并行工作
你可以尝试用并行流来加快字数统计,如下所示:
System.out.println("Found " + countWords(stream.parallel()) + " words"); 复制代码复制代码
不幸的是,这次的输出是:
Found 25 words 复制代码复制代码
显然有什么不对,可到底是哪里不对呢?问题的根源并不难找。因为原始的 String 在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。
如何解决这个问题呢?解决方案就是要确保 String 不是在随机位置拆开的,而只能在词尾拆开。要做到这一点,你必须为 Character 实现一个 Spliterator ,它只能在两个词之间拆开String (如下所示),然后由此创建并行流。
private static class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } } 复制代码复制代码
这个 Spliterator 由要解析的 String 创建,并遍历了其中的 Character ,同时保存了当前正在遍历的字符位置。让我们快速回顾一下实现了Spliterator接口的WordCounterSpliterator 中的各个函数。
- tryAdvance 方法把 String 中当前位置的 Character 传给了 Consumer ,并让位置加一。作为参数传递的 Consumer 是一个Java内部类,在遍历流时将要处理的 Character 传给了一系列要对其执行的函数。这里只有一个归约函数,即 WordCounter 类的 accumulate方法。如果新的指针位置小于 String 的总长,且还有要遍历的 Character ,则tryAdvance 返回 true 。
- trySplit 方法是 Spliterator 中最重要的一个方法,因为它定义了拆分要遍历的数据结构的逻辑。就像 RecursiveTask 的 compute 方法一样(分支/合并框架的使用方式),首先要设定不再进一步拆分的下限。这里用了一个非常低的下限——10个 Character ,仅仅是为了保证程序会对那个比较短的 String 做几次拆分。在实际应用中,就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的任务。如果剩余的 Character 数量低于下限,你就返回 null 表示无需进一步拆分。相反,如果你需要执行拆分,就把试探的拆分位置设在要解析的 String 块的中间。但我们没有直接使用这个拆分位置,因为要避免把词在中间断开,于是就往前找,直到找到一个空格。一旦找到了适当的拆分位置,就可以创建一个新的 Spliterator 来遍历从当前位置到拆分位置的子串;把当前位置 this 设为拆分位置,因为之前的部分将由新Spliterator 来处理,最后返回。
- 还需要遍历的元素的 estimatedSize 就是这个 Spliterator 解析的 String 的总长度和当前遍历的位置的差。
- 最后, characteristic 方法告诉框架这个 Spliterator 是 ORDERED (顺序就是 String中各个 Character 的次序)、 SIZED ( estimatedSize 方法的返回值是精确的)、SUBSIZED ( trySplit 方法创建的其他 Spliterator 也有确切大小)、 NONNULL ( String中 不 能 有 为 null 的 Character ) 和 IMMUTABLE ( 在 解 析 String 时 不 能 再 添 加Character ,因为 String 本身是一个不可变类)的。
- 运用 WordCounterSpliterator
现在就可以用这个新的 WordCounterSpliterator 来处理并行流了,如下所示:
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); 复制代码复制代码
传给 StreamSupport.stream 工厂方法的第二个布尔参数意味着你想创建一个并行流。把这个并行流传给 countWords 方法:
System.out.println("Found " + countWords(stream.parallel()) + " words"); 复制代码复制代码
可以得到意料之中的正确输出:
Found 19 words 复制代码复制代码
你已经看到了 Spliterator 如何让你控制拆分数据结构的策略。 Spliterator 还有最后一个值得注意的功能,就是可以在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在创建时就绑定。这种情况下,它称为延迟绑定(late-binding)的 Spliterator 。
总结
- 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
- 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
- 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
- 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎总是比尝试并行化某些操作更为重要。
- 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
- Spliterator 定义了并行流如何拆分它要遍历的数据。