当前教程
使用 Stream API 在内存中处理数据
系列中的下一个

系列中的下一个: 在 Stream 上添加中间操作

使用 Stream API 在内存中处理数据

 

介绍 Stream API

Stream API 可能是继 lambda 表达式之后,添加到 Java SE 8 的第二个最重要的功能。简而言之,Stream API 的目标是为 JDK 提供一个著名的 map-filter-reduce 算法的实现。

集合框架用于在 JVM 的内存中存储和组织数据。您可以将 Stream API 看作是集合框架的配套框架,用于以非常高效的方式处理这些数据。实际上,您可以对集合打开一个流来处理它包含的数据。

它并不止于此:Stream API 可以为您做更多的事情,而不仅仅是处理来自集合的数据。JDK 为您提供了多种模式,可以在其他来源(包括 I/O 来源)上创建流。此外,您可以轻松地创建自己的数据源,以完美地满足您的需求。

当您掌握 Stream API 后,您就可以编写非常具有表现力的代码。以下是一小段代码,您可以使用正确的静态导入进行编译

List<String> strings = List.of("one","two","three","four");
var map = strings.stream()
                 .collect(groupingBy(String::length, counting()));
map.forEach((key, value) -> System.out.println(key + " :: " + value));

此代码打印出以下内容。

运行此代码会产生以下结果。

3 :: 2
4 :: 1
5 :: 1

即使您不熟悉 Stream API,阅读使用它的代码也能让您一目了然地了解它的作用。

 

介绍 Map-Filter-Reduce 算法

在深入了解 Stream API 本身之前,让我们看看您需要了解的 map-filter-reduce 算法的元素。

此算法是处理数据的经典算法。让我们举个例子。假设您有一组 Sale 对象,它们具有三个属性:日期、产品参考和金额。为简单起见,我们假设金额只是一个整数。以下是您的 Sale 类。

public class Sale {
    private String product;
    private LocalDate date;
    private int amount;

    // constructors, getters, setters
    // equals, hashCode, toString
}

假设您需要计算 3 月份的销售总额。您可能会编写以下代码。

List<Sale> sales = ...; // this is the list of all the sales
int amountSoldInMarch = 0;
for (Sale sale: sales) {
    if (sale.getDate().getMonth() == Month.MARCH) {
        amountSoldInMarch += sale.getAmount();
    }
}
System.out.println("Amount sold in March: " + amountSoldInMarch);

您可以在这个简单的數據处理算法中看到三个步骤。

第一步是只考虑 3 月份发生的销售。您正在根据给定条件过滤掉一些您正在处理的元素。这正是过滤步骤。

第二步是从 sale 对象中提取一个属性。您对整个对象不感兴趣;您需要的是它的 amount 属性。您正在将 sale 对象映射到一个金额,即一个 int 值。这是映射步骤;它包括将您正在处理的对象转换为其他对象或值。

最后一步是将所有这些金额加起来得到一个金额。如果您熟悉 SQL 语言,您会发现最后一步看起来像一个聚合。实际上,它确实做了同样的事情。这个总和是对各个金额的减少,得到一个金额。

顺便说一句,SQL 语言在以可读的方式表达这种处理方面做得很好。您需要的 SQL 代码非常容易阅读

select sum(amount)
from Sales
where extract(month from date) = 3;

 

指定结果而不是编程算法

您可以看到,在 SQL 中,您编写的是对所需结果的描述:3 月份所有销售的金额总和。您的数据库服务器负责弄清楚如何有效地计算它。

计算此金额的 Java 代码片段是对如何计算此金额的逐步描述。它以命令式的方式进行了精确描述。它给 Java 运行时留下了很少的优化此计算的空间。

Stream API 的两个目标是使您能够创建更易读和更具表现力的代码,并为 Java 运行时提供一些优化计算的空间。

 

将对象映射到其他对象或值

map-filter-reduce 算法的第一步是映射步骤。映射包括转换您正在处理的对象或值。映射是一对一的转换:如果您映射一个包含 10 个对象的列表,您将得到一个包含 10 个已转换对象的列表。

在 Stream API 中,映射步骤添加了另一个约束。假设您正在处理一个有序对象的集合。它可以是一个列表,或者其他一些有序对象的来源。当您映射该列表时,您获得的第一个对象应该是源中第一个对象的映射。换句话说:映射步骤尊重对象的顺序;它不会打乱它们。

映射会更改对象的类型;它不会更改对象的数量。

映射由 Function 函数式接口建模。实际上,函数可以接受任何类型的对象,并返回另一种类型的对象。此外,专门的函数可以将对象映射到基本类型,反之亦然。

 

过滤掉对象

另一方面,过滤不会触及您正在处理的对象。它只是决定选择其中一些,并删除其他一些。

过滤会更改对象的数量;它不会更改对象的类型。

过滤由 Predicate 函数式接口建模。实际上,谓词可以接受任何类型的对象或基本类型,并返回一个布尔值。

 

减少对象以产生结果

减少步骤比看起来更复杂。现在,我们将使用这个定义,即它与 SQL 聚合是同一类事物。想想COUNTSUMMINMAXAVERAGE。顺便说一句,所有这些聚合都受 Stream API 支持。

为了让您对这条道路上等待着您的东西有所了解:减少步骤允许您使用数据构建复杂的结构,包括列表、集合、各种类型的映射,甚至您可以自己构建的结构。请查看此页面上的第一个示例:您可以看到对 collect() 方法的调用,该方法接受由 groupingBy() 工厂方法构建的对象。此对象是一个收集器。减少可能包括使用收集器收集您的数据。收集器将在本教程的后面部分详细介绍。

 

优化 Map-Filter-Reduce 算法

让我们再举一个例子。假设您有一个城市集合。每个城市都由一个 City 类建模,该类具有两个属性:名称和人口,即居住在该城市的人数。您需要计算居住在人口超过 100,000 人的城市的总人口。

如果不使用 Stream API,您可能会编写以下代码。

List<City> cities = ...;

int sum = 0;
for (City city: cities) {
    int population = city.getPopulation();
    if (population > 100_000) {
        sum += population;
    }
}

System.out.println("Sum = " + sum);

您可以识别出对城市列表的另一个 map-filter-reduce 处理。

现在,让我们做一个简单的思想实验:假设 Stream API 不存在,并且 Collection 接口上存在 map()filter() 方法,以及 sum() 方法。

使用这些(虚构的)方法,前面的代码可以变成以下代码。

int sum = cities.map(city -> city.getPopulation())
                .filter(population -> population > 100_000)
                .sum();

从可读性和表现力方面来看,这段代码非常容易理解。所以您可能想知道:为什么这些 map 和 filter 方法没有添加到 Collection 接口中?

让我们深入探讨一下:这些 map()filter() 方法的返回类型是什么?好吧,既然我们是在集合框架中,返回一个集合似乎很自然。所以您可以这样编写代码。

Collection<Integer> populations         = cities.map(city -> city.getPopulation());
Collection<Integer> filteredPopulations = populations.filter(population -> population > 100_000);
int sum                                 = filteredPopulations.sum();

即使链接调用提高了可读性,这段代码仍然应该是正确的。

现在让我们分析这段代码。

  • 第一步是映射步骤。您看到,如果您必须处理 1,000 个城市,那么这个映射步骤会产生 1,000 个整数并将它们放入一个集合中。
  • 第二步是过滤步骤。它遍历所有元素,并根据给定的条件删除其中一些元素。这需要测试另外 1000 个元素,并创建另一个集合,可能更小。

由于这段代码返回一个集合,它映射所有城市,然后过滤得到的整数集合。这与您最初编写的 *for 循环* 方式截然不同。存储这个中间整数集合可能会导致大量开销,尤其是在您需要处理大量城市的情况下。for 循环没有这种开销:它直接将结果中的整数加起来,而不会将它们存储在中间结构中。

这种开销很糟糕,在某些情况下甚至更糟。假设您需要知道集合中是否有超过 10 万居民的城市。也许集合中的第一个城市就是这样的城市。在这种情况下,您将能够几乎不费吹灰之力地产生结果。首先,构建所有城市人口的集合,然后过滤它并检查结果是否为空将是荒谬的。

出于明显的性能原因,在 Collection 接口上创建返回 Collectionmap() 方法并不是正确的方法。您最终会创建不必要的中间结构,这会给内存和 CPU 带来很大的开销。

这就是为什么 map()filter() 方法没有添加到 Collection 接口中的原因。相反,它们是在 Stream 接口上创建的。

正确的模式如下。

Stream<City> streamOfCities         = cities.stream();
Stream<Integer> populations         = streamOfCities.map(city -> city.getPopulation());
Stream<Integer> filteredPopulations = populations.filter(population -> population > 100_000);
int sum = filteredPopulations.sum(); // in fact this code does not compile; we'll fix it later

Stream 接口避免创建中间结构来存储映射或过滤的对象。这里 map()filter() 方法仍然返回新的流。因此,为了使这段代码正常工作并保持效率,不应该在这些流中存储任何数据。在这段代码中创建的流 streamOfCitiespopulationsfilteredPopulations 必须都是空对象。

这导致了流的一个非常重要的属性

流是一个不存储任何数据的对象。

Stream API 的设计方式是,只要您在流模式中没有创建任何非流对象,就不会进行任何数据计算。在前面的示例中,您正在计算流处理的元素的总和。

这个求和操作触发了计算:cities 列表中的所有对象都通过流的所有操作逐个拉取。首先,它们被映射,然后被过滤,如果它们通过过滤步骤,则被加起来。

流按与编写等效 for 循环相同的顺序处理数据。这样就不会有内存开销。此外,在某些情况下,您可以产生结果,而无需遍历集合中的所有元素。

使用流就是创建操作管道。在某个时刻,您的数据将通过这个管道,并被转换、过滤,然后参与结果的生成。

管道由一系列对流的函数调用组成。每次调用都会产生另一个流。然后,在某个时刻,最后一次调用会产生结果。返回另一个流的操作称为中间操作。另一方面,返回其他内容(包括 void)的操作称为终止操作。

 

使用中间操作创建管道

中间操作是返回另一个流的操作。调用此类操作会在现有的操作管道上添加一个操作,而不会处理任何数据。它由返回流的函数建模。

 

使用终止操作计算结果

终止操作是不会返回流的操作。调用此类操作会触发对流源元素的消费。然后,这些元素会按顺序由中间操作管道处理,一次处理一个元素。

终止操作由返回除流以外的任何内容(包括 void)的函数建模。

您不能在一个给定的流实例上调用多个中间或终止方法。如果您这样做,您将收到一个 IllegalStateException,并显示以下消息:“流已被操作或关闭”,例如以下示例。您不能在 stream 上调用 toList() 方法,因为您已经对其调用了 map() 方法。

var stream = Stream.of(1, 2, 3, 4);
var stream1 = stream.map(i -> i + 1);
var list = stream.toList();

 

使用专门的数字流避免装箱

Stream API 为您提供了四个接口。

第一个是 Stream,您可以使用它来定义对任何类型对象的运算管道。

然后有三个专门的接口来处理数字流:IntStreamLongStreamDoubleStream。这三个流使用数字的原始类型而不是包装类型来避免装箱和拆箱。它们几乎与 Stream 中定义的方法相同,但也有一些例外。由于它们处理的是数字,因此它们具有一些在 Stream 中不存在的终止操作

  • sum():计算总和
  • min()max():计算流的最小或最大数字
  • average():计算数字的平均值
  • summaryStatistics():此调用会生成一个特殊的对象,该对象包含多个统计信息,所有这些统计信息都在对您的数据进行一次遍历时计算得出。这些统计信息是流处理的元素数量、最小值、最大值、总和和平均值。

 

遵循最佳实践

如您所见,您只允许在一个流上调用一个方法,即使该方法是中间方法。因此,将流存储在字段或局部变量中是无用的,有时甚至是危险的。编写接受流作为参数的方法也可能很危险,因为您无法确定接收到的流是否已被操作。流应该在现场创建和使用。

流是一个与源连接的对象。它从该源拉取它处理的元素。该源不应该被流本身修改。这样做会导致不可预知的结果。在某些情况下,该源是不可变的或只读的,因此您将无法这样做,但也有可能这样做。

Stream 接口中提供了大量方法,您将在本教程中看到其中大多数方法。编写修改流本身外部的某些变量或字段的操作是一个不好的主意,始终可以避免。流不应该有任何 *副作用*。


上次更新: 2021 年 9 月 14 日


当前教程
使用 Stream API 在内存中处理数据
系列中的下一个

系列中的下一个: 在 Stream 上添加中间操作