并行流
优化流计算
流 API 的一个非常令人兴奋的功能是,流能够并行处理数据。使用流 API 并行处理数据就像在任何现有流上调用 parallel()
方法一样简单。
int parallelSum =
IntStream.range(0, 10)
.parallel()
.sum();
运行此代码将给出以下结果。
parallelSum = 45
实际上,这个总和是并行计算的。但是,您可能不会注意到这种小型示例中的任何性能提升。
为什么要并行计算数据?可能是为了更快地获得计算结果。并行流会比顺序流更快地给出结果吗?好吧,这个问题的答案并不像听起来那么简单。在某些情况下,答案是肯定的,但在其他一些情况下,不幸的是,答案是否定的。虽然听起来令人沮丧,但并行流并不总是比顺序流快。
考虑到这一点,您应该谨慎:选择使用并行流不是一个轻率的决定。在考虑并行之前,您需要问自己几个问题。
首先,问问自己,您需要它吗?您的应用程序是否有一些未满足的性能要求?您确定您的性能问题来自您正在考虑并行计算的流处理吗?您打算如何衡量性能提升,以确保并行执行此特定计算提高了应用程序的性能?
并行执行会消耗更多计算能力。您有空闲的 CPU 或 CPU 内核可以分配给此计算吗?您可以在不减慢应用程序其他部分的情况下,为您的计算提供更多 CPU 周期吗?
并行执行会消耗线程。您有空闲的线程可以分配给您的计算吗?如果您正在使用运行在 Web 服务器上的应用程序,那么您的线程用于处理 HTTP 请求。您是否可以将它们用于其他用途?
一旦您选择并行执行,那么您需要确保流计算的性能确实有所提高。您应该在尽可能接近生产环境的上下文中衡量这种性能提升。
在本教程中,我们将介绍几个关键要素,这些要素将帮助您评估并行执行可能带来的收益,以及一些其他要素,这些要素应该让您对并行执行保持警惕。但最终,唯一能告诉您并行执行是否值得的因素是测试和衡量执行时间。
并行化实现
并行化在流 API 中通过使用流正在处理的数据的递归分解来实现。它建立在 JDK 7 中添加的 Fork/Join 框架之上。
分解包括将流正在处理的数据分成两部分。然后,每个部分由其自己的 CPU 内核处理,该内核可以递归地决定再次将其分解。
在某个时刻,框架将决定给定部分中的数据量足够小,可以正常处理。然后将处理此数据子集,并计算部分结果。然后,将此部分结果与在其他 CPU 内核上从其他部分计算出的其他部分结果合并。
并行执行确实会带来开销。此开销必须比将计算分布到多个 CPU 内核的收益小。如果不是,那么并行执行将使计算的性能变差,而不是提高性能。
让我们逐一检查所有这些步骤,看看什么会阻止您获得更好的性能提升。
理解数据局部性
数据局部性会影响数据的处理速度,无论它是顺序处理还是并行处理。局部性越好,计算速度越快。
为了使数据可供 CPU 使用,必须将数据从计算机的主内存传输到 CPU 的缓存。从物理上讲,主内存是计算机的特定组件,与 CPU 分开。另一方面,缓存与 CPU 的核心计算元素共享相同的硅片。它们通过主板和不同的通信总线连接在一起。与 CPU 内核从其缓存访问数据的速度相比,将数据从主内存传输到 CPU 的缓存非常慢。
当您的 CPU 需要一些数据时,它首先检查此数据是否在其缓存中可用。如果是,那么它可以立即使用它。如果不是,那么必须从主内存中获取此数据并将其复制到缓存中。这种情况称为缓存未命中。缓存未命中代价高昂,因为在此期间,您的 CPU 正在等待您的数据。您希望避免这种情况。
数据在主内存和 CPU 缓存之间传输的方式在避免缓存未命中方面起着重要作用。内存按行组织。通常,一行长 64 字节,即八个 long
值(它可能因 CPU 而异)。主内存和 CPU 缓存之间所有传输都是逐行进行的。因此,即使您的 CPU 只需要一个 int
值,包含此值的完整行也会被传输到缓存中。
遍历基本类型数组
假设您的代码正在遍历一个 int[]
类型的数组。一行 64 字节可以容纳 16 个 int
值。假设访问数组的第一个元素是缓存未命中。然后,CPU 将加载包含此元素的行到其缓存中以开始迭代。因为它加载了一整行,所以接下来的 15 个值也可能被传输。访问接下来的值将非常快。
在这种情况下,数据局部性非常好:您的数据在主内存的连续区域中物理存储。这是可取的,因为将您的数据从主内存传输到 CPU 缓存将快得多。
遍历整数实例数组
现在假设您的代码正在遍历一个 Integer[]
类型的数组。您实际上不再拥有基本类型的数组,而是拥有引用数组。此数组的每个单元格都包含对 Integer
类型的对象的引用,该对象可以在内存中的任何位置。
如果访问数组的第一个元素是缓存未命中,那么 CPU 将不得不将其加载包含此元素的行到其缓存中。它实际上加载的是数组的前 16 个引用,假设第一个引用位于行的开头。然后它必须加载第一个 Integer
对象,该对象可能位于主内存中的其他位置,从而导致另一个缓存未命中。事实上,很有可能数组的每个 Integer
对象的读取也会是缓存未命中。
在这种情况下,数据局部性不如前面的示例好:对数据的引用在主内存的连续区域中物理存储,但您需要进行计算的值却不是。这是不可取的,因为将您需要的值从主内存传输到 CPU 缓存比数组基本类型的情况慢得多。
遍历整数实例的链表
让我们来考察最后一种情况。假设现在您的代码正在迭代一个类型为 LinkedList<Integer>
的列表。如果对第一个元素的访问是缓存未命中,那么 CPU 将把您的链表的第一个节点加载到其缓存中。该节点包含两个引用:第一个指向您计算所需的数值,第二个指向列表的下一个节点。这种情况比前一种情况更糟糕:访问列表的下一个值的可能性很大,会导致两次缓存未命中。
在这种情况下,数据局部性很差:您的数据及其引用都没有存储在主内存的连续区域中。访问您需要的元素将比我们检查的第一种情况慢得多。
避免指针追逐
必须遵循引用或指针才能到达包含您需要数据的正确元素,这被称为指针追逐。指针追逐是您在应用程序中想要避免的,并且是许多性能下降的根源。在迭代 int
值数组时,指针追逐不存在。当迭代 Integer
实例的链表时,它构成了您的主要性能下降。
拆分数据源
如果您决定并行处理流,第一步将包括拆分您的数据源。为了使拆分有效,它应该具有几个属性。
- 拆分数据结构应该简单快捷。
- 拆分应该均匀:您得到的两个子流应该具有相同的数据量来处理。
拆分集合实例
一个 ArrayList
是一个完美的拆分数据结构。您可以轻松地获取中间元素,如果您通过中间拆分数组,您就会确切地知道两个子数组中将有多少个元素。
另一方面,一个 LinkedList
不是一个好的拆分结构。要到达中间元素,需要逐个遍历列表的一半元素,这很昂贵,因为指针追逐。到达那里后,您可以获得具有正确元素数量的两个子列表。
一个 HashSet
是建立在桶数组上的,因此拆分此数组与拆分数组列表的内部数组相同。但是数据在此数组中的存储方式不同。以确保两部分具有相同数量的元素的方式拆分此数组更难。您甚至可能最终得到一个空的子部分。
一个 TreeSet
基于红黑树实现。它保证所有节点在其左右子节点上具有相同数量的元素。因此,将 TreeSet
实例拆分为两个均匀的子树很容易。但是,您仍然需要追逐指针才能到达您的数据。
所有这些结构都用于集合框架,您可以获得它们各自包含的元素数量。
并非所有可以从中创建流的结构都是如此。
拆分文本文件的行
对于 Files.lines(path)
模式,情况就是这样,该模式在本教程的前面部分已经介绍过。它创建一个流,该流处理由此 path
对象表示的文本文件中的行。如果不分析文本文件,就不可能获得文本文件中的行数。
对于我们也介绍过的 Pattern.splitAsStream(line)
模式,情况也是如此。它使用提供的模式从 line
的拆分中创建一个流。同样,您无法提前知道您将在这样的流中处理多少个元素。
拆分范围或生成的流
专门的数字流也为您提供了创建流的模式。
IntStream.range(0, 10)
流很容易拆分。事实上,它看起来像一个数字数组,您可以通过中间拆分它。每个部分中的元素数量是可以预测的,这是可取的。
另一方面,Stream.generate()
和 Stream.iterate()
方法不会为您提供一个易于拆分的數據源。事实上,此源可能是无限的,并且仅受其在流中的处理方式限制。
让我们比较以下两种模式。
List<Integer> list1 =
IntStream.range(0, 10).boxed()
.toList();
List<Integer> list2 =
IntStream.iterate(0, i -> i + 1)
.limit(10).boxed()
.toList();
列表 list1
和 list2
都是相同的,使用不同的模式创建。第一个易于拆分,而第二个则不易拆分。主要原因是,在第二种模式中,了解第五个元素的值需要计算所有先前的元素。从这个意义上说,这个第二种模式看起来像一个链表,您需要访问前四个元素才能到达第五个元素。
拆分和调度工作
一旦您的数据源被拆分,那么两个子流必须在 CPU 的不同核心上处理,才能使并行化有效。
这是由 Fork/Join 框架完成的。Fork/Join 框架处理一个线程池,该线程池在您的应用程序启动时创建,称为通用 Fork/Join 池。此池中的线程数量与您的 CPU 拥有的核心数量一致。此池中的每个线程都有一个等待队列,线程可以在其中存储任务。
池中的第一个线程创建一个第一个任务。此任务的执行决定了计算是否足够小以顺序计算,或者是否太大,应该拆分。
如果它被拆分,那么两个子任务将被创建并存储在该线程的队列中。然后主任务等待两个子任务完成。在等待时,它也被存储在这个等待队列中。
如果计算被执行,那么将产生一个结果。此结果是整个计算的部分结果。然后,此任务将结果返回给创建它的主任务。
一旦一个任务获得了它创建的两个子任务的两个结果,它就可以将它们合并以产生一个结果并将其返回给创建它的主任务。
在给定时间点,第一个主任务从其两个子任务获得两个部分结果。然后它能够将它们合并并返回计算的最终结果。
目前,唯一工作的线程是池中的第一个线程,它是由 Fork/Join 框架调用的。Fork/Join 框架实现了另一种并发编程模式,称为工作窃取。池中的空闲线程可以检查同一池中其他线程的等待队列,以获取任务并处理它。
这就是在这种情况下发生的事情。一旦第一个等待队列中的任务数量增加,其他线程将窃取其中一些任务,处理它们,进一步拆分工作,并用更多任务填充它们自己的等待队列。此功能使池中的所有线程保持繁忙。
此工作窃取功能运行良好,但它有一个缺点:根据您的源代码的拆分方式以及任务从一个线程移动到另一个线程的方式,您的数据可能会以任何顺序处理。在某些情况下,这可能是一个问题。
处理子流
处理子流可能与处理完整流不同。两个元素可以使子流的处理不同:访问外部状态,以及将状态从一个元素的处理传递到另一个元素。这两个元素将影响并行流的性能。
访问外部状态
Fork/Join 框架将您的计算拆分为许多子任务,每个子任务由池中的一个线程处理。
如果您顺序处理流,所有元素都在运行您的方法的线程中处理。如果您并行处理同一个流,元素将由通用 Fork/Join 池中的一个线程处理。
然后,从另一个线程访问流外部的状态可能会导致竞争条件。
让我们在经典的 main
方法中运行以下代码。
Set<String> threadNames =
IntStream.range(0, 100)
// .parallel()
.mapToObj(index -> Thread.currentThread().getName())
.collect(Collectors.toSet());
System.out.println("Thread names:");
threadNames.forEach(System.out::println);
它产生的结果如下。
Thread names:
main
如果您取消对 parallel()
调用的注释,那么此流将并行执行。结果将变为以下内容,并且可能在您的机器上有所不同。
Thread names:
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-4
main
ForkJoinPool.commonPool-worker-5
对非并发外部元素的任何访问都可能导致竞争条件和数据不一致。让我们运行以下代码。
List<Integer> ints = new ArrayList<>();
IntStream.range(0, 1_000_000)
.parallel()
.forEach(ints::add);
System.out.println("ints.size() = " + ints.size());
多次运行此代码可能会导致不同的结果,因为通用 Fork/Join 池中的所有线程都试图在 ArrayList
实例中并发添加数据,而 ArrayList
不是线程安全的结构。看到正确结果的机会很小,您甚至可能得到一个 ArrayIndexOutOfBoundsException
。使用任何非并发集合或映射运行此类代码会导致不可预测的结果,包括异常。
对于流来说,修改流外部的状态是一种反模式。
遇到顺序
在 Stream API 中,数据处理的顺序在某些情况下至关重要。以下方法就是这种情况。
limit(n)
: 将处理限制为此流的n
个第一个元素。skip(n)
: 跳过此流的n
个第一个元素的处理。findFirst()
: 查找流的第一个元素。
这三种方法需要记住流元素的处理顺序,并需要对元素进行计数才能产生正确的结果。
它们被称为有状态操作,因为它们需要维护内部状态才能工作。
拥有这样的有状态操作会导致并行流的开销。例如,limit()
需要一个内部计数器才能正常工作。在并行情况下,这个内部计数器在不同的线程之间共享。在线程之间共享可变状态代价很高,应该避免。
理解并行计算流的开销
并行计算流会增加一些处理并行的计算。这些元素是有成本的,你需要了解它们,以确保与并行带来的好处相比,这个成本不会过高。
- 你的数据需要被分割。分割可能很便宜,也可能很昂贵,这取决于你处理的数据。数据位置不佳会导致分割变得昂贵。
- 分割需要高效。它需要创建均匀分割的子流。一些数据源可以很容易地均匀分割,而另一些则不行。
- 分割后,实现会并发地处理你的数据。你应该避免访问任何外部可变状态,也应该避免拥有内部共享可变状态。
- 然后,部分结果需要合并。有些结果可以很容易地合并。合并整数的总和很容易也很便宜。合并集合也很容易。合并哈希映射的成本更高。
说明一些正确使用并行流的规则
规则 #1 不要因为好玩而优化;优化是因为你有需求,而你没有满足这些需求。
规则 #2 谨慎选择你的数据源。
规则 #3 不要修改外部状态,也不要共享可变状态。
规则 #4 不要猜测;测量代码的性能。
最后更新: 2021 年 9 月 14 日