系列中的上一篇
当前教程
流的归约
系列中的下一篇

系列中的上一篇: 创建流

系列中的下一篇: 在流上添加终止操作

流的归约

 

在流上调用终止操作

到目前为止,您在本教程中了解到,归约流包括以类似于 SQL 语言中所做操作的方式聚合流的元素。在您运行的示例中,您还使用 collect(Collectors.toList()) 模式将构建的流的元素收集到列表中。所有这些操作在流 API 中被称为终止操作,并且包括归约您的流。

在流上调用终止操作时,您需要记住两件事。

  1. 没有终止操作的流不会处理任何数据。如果您在应用程序中发现这样的流,它很可能是一个错误。
  2. 给定的流实例只能有一个中间或终止操作调用。您不能重复使用流;如果您尝试这样做,您将获得一个 IllegalStateException

 

使用二元运算符归约流

Stream 接口中定义了 reduce() 方法的三个重载。它们都将 BinaryOperator 对象作为参数。让我们检查一下如何使用此二元运算符。

让我们举个例子。假设您有一个整数列表,您需要计算这些整数的总和。您可以编写以下代码来计算此总和,使用经典的 for 循环模式。

List<Integer> ints = List.of(3, 6, 2, 1);

int sum = ints.get(0);
for (int index = 1; index < ints.size(); index++) {
    sum += ints.get(index);
}
System.out.println("sum = " + sum);

运行它会打印出以下结果。

sum = 12

这段代码的作用如下。

  1. 取列表的前两个元素并将它们加在一起。
  2. 然后取下一个元素,并将其加到您已经计算出的部分和中。
  3. 重复此过程,直到您到达列表的末尾。

这种计算方式可以在下图中总结。

Summing the Elements of a Stream

对流的元素求和

如果您仔细检查这段代码,您会发现您可以使用二元运算符对SUM运算符进行建模,以实现相同的结果。然后代码将变为以下内容。

List<Integer> ints = List.of(3, 6, 2, 1);
BinaryOperator<Integer> sum = (a, b) -> a + b;

int result = ints.get(0);
for (int index = 1; index < ints.size(); index++) {
    result = sum.apply(result, ints.get(index));
}
System.out.println("sum = " + result);

现在您可以看到,这段代码只依赖于二元运算符本身。假设您需要计算一个MAX。您需要做的就是为此提供正确的二元运算符。

List<Integer> ints = List.of(3, 6, 2, 1);
BinaryOperator<Integer> max = (a, b) -> a > b ? a: b;

int result = ints.get(0);
for (int index = 1; index < ints.size(); index++) {
    result = max.apply(result, ints.get(index));
}
System.out.println("max = " + result);

由此得出结论,您实际上可以通过只提供一个只对两个元素进行操作的二元运算符来计算归约。这就是流 API 中 reduce() 方法的工作原理。

 

选择可以在并行中使用的二元运算符

不过,您需要了解两个注意事项。让我们在这里介绍第一个,并在下一节介绍第二个。

第一个是流可以并行计算。这一点将在本教程的后面部分详细介绍,但我们现在需要讨论它,因为它对这个二元运算符有影响。

以下是流 API 中并行实现的方式。您的数据源被分成两部分,每部分分别处理。每个过程与您刚刚看到的过程相同,它使用您的二元运算符。然后,当每个部分处理完毕后,这两个部分结果将使用相同的二元运算符合并。

以下是这种计算方式。

Summing Elements of a Stream in Parallel

并行对流的元素求和

处理数据流非常容易:只需在给定流上调用 parallel() 即可。

让我们检查一下幕后的事情是如何运作的,为此,您可以编写以下代码。您只是在模拟如何并行进行计算。这当然是对并行流的过度简化版本,只是为了解释事情是如何运作的。

让我们创建一个 reduce() 方法,它接受一个二元运算符并使用它来归约一个整数列表。代码如下。

int reduce(List<Integer> ints, BinaryOperator<Integer> sum) {
    int result = ints.get(0);
    for (int index = 1; index < ints.size(); index++) {
        result = sum.apply(result, ints.get(index));
    }
    return result;
}

以下是使用此方法的主要代码。

List<Integer> ints = List.of(3, 6, 2, 1);
BinaryOperator<Integer> sum = (a, b) -> a + b;

int result1 = reduce(ints.subList(0, 2), sum);
int result2 = reduce(ints.subList(2, 4), sum);

int result = sum.apply(result1, result2);
System.out.println("sum = " + result);

为了明确起见,我们已将您的数据源分成两部分,并分别将其归约为两个整数:reduce1reduce2。然后我们使用相同的二元运算符合并了这些结果。这基本上是并行流的工作原理。

这段代码非常简化,它只是为了展示您的二元运算符应该具有的一个非常特殊的属性。您如何拆分流的元素不应影响计算结果。所有以下拆分都应该为您提供相同的结果

  • 3 + (6 + 2 + 1)
  • (3 + 6) + (2 + 1)
  • (3 + 6 + 2) + 1

这表明您的二元运算符应该具有一个众所周知的属性,称为结合律。传递给 reduce() 方法的二元运算符应该是结合的。

流 API 中 reduce() 方法重载版本的 JavaDoc API 文档指出,您作为参数提供的二元运算符必须是结合的。

如果不是会发生什么?好吧,这正是问题所在:它不会被编译器或 Java 运行时检测到。因此,您的数据将被处理,没有明显的错误。您可能得到正确的结果,也可能没有;这取决于您内部处理数据的方式。事实上,如果您多次运行代码,您最终可能会得到不同的结果。这是一个非常重要的点,您需要意识到。

如何测试您的二元运算符是否结合?在某些情况下,这可能非常容易:SUMMINMAX 是众所周知的结合运算符。在其他一些情况下,这可能要困难得多。检查该属性的一种方法是在随机数据上运行您的二元运算符,并验证您是否始终获得相同的结果。如果您没有,那么您就知道您的二元运算符不是结合的。如果您有,那么不幸的是,您无法可靠地得出结论。

 

管理没有身份元素的二元运算符

第二个是您二元运算符应该具有的结合律属性的结果。

这种结合律属性是由以下事实强加的:您划分数据的方式不应影响计算结果。如果您将集合A分成两个子集BC,那么归约A应该为您提供与归约B的归约和C的归约相同的结果。

您可以将前面的属性写成更一般的以下表达式

A = BC ⇒ Red(A) = Red(Red(B), Red(C))

事实证明,这会导致另一个结果。假设事情进展不顺利,并且B实际上是空的。在这种情况下,C = A。前面的表达式变为以下内容

Red(A) = Red(Red(∅), Red(A))

当且仅当空集 (∅) 的归约是归约操作的身份元素时,此表达式才为真。

这是数据处理中的一个普遍属性:空集的归约是归约操作的身份元素。

这在数据处理中确实是一个问题,尤其是在并行数据处理中,因为一些非常经典的归约二元运算符没有身份元素,即MINMAX。空集的最小元素没有定义,因为MIN操作没有身份元素。

这个问题必须在流 API 中得到解决,因为您可能需要处理空流。您看到了创建空流的模式,并且很容易看出 filter() 调用可以过滤掉您的流正在处理的所有数据,从而返回一个将没有要处理的数据的流。

流 API 中做出的选择如下。归约,其身份元素未知(要么不存在,要么未提供),将返回 Optional 类的实例。我们将在本教程的后面部分更详细地介绍此类。您现在需要知道的是,这个 Optional 类是一个可以为空的包装类。每次您在没有已知身份元素的流上调用终止操作时,流 API 都会将结果包装在该对象中。如果您处理的流为空,那么这个可选值也将为空,并且由您和您的应用程序决定如何处理这种情况。

 

探索流 API 的归约方法

正如我们之前提到的,流 API 有 reduce() 方法的三个重载,我们现在可以详细介绍它们。

使用身份元素归约

第一个接受一个身份元素和一个 BinaryOperator 实例。因为您提供的第一个参数已知是二元运算符的身份元素,所以实现可以使用它来简化计算。它不会取流的前两个元素来开始处理,而是不取任何元素,并从这个身份元素开始。使用的算法形式如下。

List<Integer> ints = List.of(3, 6, 2, 1);
BinaryOperator<Integer> sum = (a, b) -> a + b;
int identity = 0;

int result = identity;
for (int i: ints) {
    result = sum.apply(result, i);
}

System.out.println("sum = " + result);

您可以注意到,这种写法即使在您需要处理的列表为空的情况下也能很好地工作。在这种情况下,它将返回身份元素,这就是您需要的。

API 不会检查您提供的元素是否确实是二元运算符的单位元。提供一个不是单位元的元素将返回一个损坏的结果。

您可以在以下示例中看到这一点。

Stream<Integer> ints = Stream.of(0, 0, 0, 0);

int sum = ints.reduce(10, (a, b) -> a + b);
System.out.println("sum = " + sum);

您期望这段代码在控制台上打印值 0。因为 reduce() 方法调用的第一个参数不是二元运算符的单位元,因此结果实际上是错误的。运行这段代码将在您的控制台上打印以下内容。

sum = 10

以下是您应该使用的正确代码。

Stream<Integer> ints = Stream.of(0, 0, 0, 0);

int sum = ints.reduce(0, (a, b) -> a + b);
System.out.println("sum = " + sum);

此示例向您展示了传递错误的单位元在编译或运行代码时不会触发任何错误或异常。真正取决于您来确保您传递的对象确实是您的二元运算符的单位元。

测试此属性的方法与测试结合律相同。将您的候选单位元与尽可能多的值组合起来。如果您发现一个被组合改变的值,那么您的候选值就不正确。不幸的是,如果您找不到任何错误的组合,并不一定意味着您的候选值是正确的。

使用身份元素归约

reduce() 方法的第二个重载只接受一个 BinaryOperator 实例,没有单位元。正如预期的那样,它返回一个 Optional 对象,包装了约简的结果。您可以对可选对象做的最简单的事情就是打开它,看看里面是否有东西。

让我们举一个没有单位元的约简的例子。

Stream<Integer> ints = Stream.of(2, 8, 1, 5, 3);
Optional<Integer> optional = ints.reduce((i1, i2) -> i1 > i2 ? i1: i2);

if (optional.isPresent()) {
    System.out.println("result = " + optional.orElseThrow());
} else {
    System.out.println("No result could be computed");
}

运行这段代码会得到以下结果。

result = 8

请注意,这段代码使用 orElseThrow() 方法打开可选对象,这现在是首选方法。这种模式是在 Java SE 10 中添加的,用来替代最初在 Java SE 8 中引入的更传统的 get() 方法。

get() 方法的问题是,如果可选对象为空,它可能会抛出 NoSuchElementException。将此方法命名为 orElseThrow()get() 更受欢迎,因为它提醒您,如果您尝试打开一个空的可选对象,您将得到一个异常。

可选对象可以做很多事情,您将在本教程的后面学习。

在一个方法中融合映射和约简

第三个稍微复杂一些。它结合了内部映射和约简,以及多个参数。

让我们检查一下此方法的签名。

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator,
             BinaryOperator<U> combiner);

此方法使用类型 U,该类型在本地定义到此方法中,并由二元运算符使用。二元运算符的工作方式与 reduce() 方法的先前重载中的工作方式相同,只是它不应用于流的元素,而只是应用于它们的映射版本。

实际上,这种映射和约简本身被组合成一个操作:累加器。请记住,在本部分的开头,您看到约简是增量进行的,并且一次消耗一个元素。在每个点,约简操作的第一个参数是迄今为止消耗的所有元素的局部约简。

单位元是组合器的单位元。

这正是这里发生的事情。

假设您有一个 String 实例的流,您需要使用这种模式对所有这些字符串的长度求和。

组合器组合两个整数:迄今为止处理的字符串长度的局部和。因此,您需要提供的单位元是单位元:O。

累加器从流中获取一个元素,将其映射到一个整数(该字符串的长度),并将其添加到迄今为止计算的局部和中。

以下是算法的工作原理。

Fusing Reduction and Mapping

融合约简和映射

相应的代码如下。

Stream<String> strings = Stream.of("one", "two", "three", "four");

BinaryOperator<Integer> combiner = (length1, length2) -> length1 + length2;
BiFunction<Integer, String, Integer> accumulator =
        (partialReduction, element) -> partialReduction + element.length();

int result = strings.reduce(0, accumulator, combiner);
System.out.println("sum = " + result);

运行这段代码会产生以下结果。

sum = 15

在上面的示例中,映射器将仅仅是以下函数。

Function<String, Integer> mapper = String::length;

因此,您可以将累加器重写为以下模式。这种写法清楚地展示了映射(由映射器建模)和约简(由二元运算符建模)的融合。

Function<String, Integer> mapper = String::length;
BinaryOperator<Integer> combiner = (length1, length2) -> length1 + length2;

BiFunction<Integer, String, Integer> accumulator =
        (partialReduction, element) -> partialReduction + mapper.apply(element);

上次更新: 2021 年 9 月 14 日


系列中的上一篇
当前教程
流的归约
系列中的下一篇

系列中的上一篇: 创建流

系列中的下一篇: 在流上添加终止操作