使用 Stream API 在内存中处理数据
介绍 Stream API
Stream API 可能是继 lambda 表达式之后,添加到 Java SE 8 的第二个最重要的功能。简而言之,Stream API 的目标是为 JDK 提供一个著名的 map-filter-reduce 算法的实现。
集合框架用于在 JVM 的内存中存储和组织数据。您可以将 Stream API 看作是集合框架的配套框架,用于以非常高效的方式处理这些数据。实际上,您可以对集合打开一个流来处理它包含的数据。
它并不止于此:Stream API 可以为您做更多的事情,而不仅仅是处理来自集合的数据。JDK 为您提供了多种模式,可以在其他来源(包括 I/O 来源)上创建流。此外,您可以轻松地创建自己的数据源,以完美地满足您的需求。
当您掌握 Stream API 后,您就可以编写非常具有表现力的代码。以下是一小段代码,您可以使用正确的静态导入进行编译
List<String> strings = List.of("one","two","three","four");
var map = strings.stream()
.collect(groupingBy(String::length, counting()));
map.forEach((key, value) -> System.out.println(key + " :: " + value));
此代码打印出以下内容。
- 它使用
groupingBy(String::length)
按字符串长度对字符串进行分组 - 它使用
counting()
统计每个长度的字符串数量 - 然后它创建一个
Map<Integer, Long>
来存储结果
运行此代码会产生以下结果。
3 :: 2
4 :: 1
5 :: 1
即使您不熟悉 Stream API,阅读使用它的代码也能让您一目了然地了解它的作用。
介绍 Map-Filter-Reduce 算法
在深入了解 Stream API 本身之前,让我们看看您需要了解的 map-filter-reduce 算法的元素。
此算法是处理数据的经典算法。让我们举个例子。假设您有一组 Sale
对象,它们具有三个属性:日期、产品参考和金额。为简单起见,我们假设金额只是一个整数。以下是您的 Sale
类。
public class Sale {
private String product;
private LocalDate date;
private int amount;
// constructors, getters, setters
// equals, hashCode, toString
}
假设您需要计算 3 月份的销售总额。您可能会编写以下代码。
List<Sale> sales = ...; // this is the list of all the sales
int amountSoldInMarch = 0;
for (Sale sale: sales) {
if (sale.getDate().getMonth() == Month.MARCH) {
amountSoldInMarch += sale.getAmount();
}
}
System.out.println("Amount sold in March: " + amountSoldInMarch);
您可以在这个简单的數據处理算法中看到三个步骤。
第一步是只考虑 3 月份发生的销售。您正在根据给定条件过滤掉一些您正在处理的元素。这正是过滤步骤。
第二步是从 sale
对象中提取一个属性。您对整个对象不感兴趣;您需要的是它的 amount
属性。您正在将 sale
对象映射到一个金额,即一个 int
值。这是映射步骤;它包括将您正在处理的对象转换为其他对象或值。
最后一步是将所有这些金额加起来得到一个金额。如果您熟悉 SQL 语言,您会发现最后一步看起来像一个聚合。实际上,它确实做了同样的事情。这个总和是对各个金额的减少,得到一个金额。
顺便说一句,SQL 语言在以可读的方式表达这种处理方面做得很好。您需要的 SQL 代码非常容易阅读
select sum(amount)
from Sales
where extract(month from date) = 3;
指定结果而不是编程算法
您可以看到,在 SQL 中,您编写的是对所需结果的描述:3 月份所有销售的金额总和。您的数据库服务器负责弄清楚如何有效地计算它。
计算此金额的 Java 代码片段是对如何计算此金额的逐步描述。它以命令式的方式进行了精确描述。它给 Java 运行时留下了很少的优化此计算的空间。
Stream API 的两个目标是使您能够创建更易读和更具表现力的代码,并为 Java 运行时提供一些优化计算的空间。
将对象映射到其他对象或值
map-filter-reduce 算法的第一步是映射步骤。映射包括转换您正在处理的对象或值。映射是一对一的转换:如果您映射一个包含 10 个对象的列表,您将得到一个包含 10 个已转换对象的列表。
在 Stream API 中,映射步骤添加了另一个约束。假设您正在处理一个有序对象的集合。它可以是一个列表,或者其他一些有序对象的来源。当您映射该列表时,您获得的第一个对象应该是源中第一个对象的映射。换句话说:映射步骤尊重对象的顺序;它不会打乱它们。
映射会更改对象的类型;它不会更改对象的数量。
映射由 Function
函数式接口建模。实际上,函数可以接受任何类型的对象,并返回另一种类型的对象。此外,专门的函数可以将对象映射到基本类型,反之亦然。
过滤掉对象
另一方面,过滤不会触及您正在处理的对象。它只是决定选择其中一些,并删除其他一些。
过滤会更改对象的数量;它不会更改对象的类型。
过滤由 Predicate
函数式接口建模。实际上,谓词可以接受任何类型的对象或基本类型,并返回一个布尔值。
减少对象以产生结果
减少步骤比看起来更复杂。现在,我们将使用这个定义,即它与 SQL 聚合是同一类事物。想想COUNT、SUM、MIN、MAX、AVERAGE。顺便说一句,所有这些聚合都受 Stream API 支持。
为了让您对这条道路上等待着您的东西有所了解:减少步骤允许您使用数据构建复杂的结构,包括列表、集合、各种类型的映射,甚至您可以自己构建的结构。请查看此页面上的第一个示例:您可以看到对 collect()
方法的调用,该方法接受由 groupingBy()
工厂方法构建的对象。此对象是一个收集器。减少可能包括使用收集器收集您的数据。收集器将在本教程的后面部分详细介绍。
优化 Map-Filter-Reduce 算法
让我们再举一个例子。假设您有一个城市集合。每个城市都由一个 City
类建模,该类具有两个属性:名称和人口,即居住在该城市的人数。您需要计算居住在人口超过 100,000 人的城市的总人口。
如果不使用 Stream API,您可能会编写以下代码。
List<City> cities = ...;
int sum = 0;
for (City city: cities) {
int population = city.getPopulation();
if (population > 100_000) {
sum += population;
}
}
System.out.println("Sum = " + sum);
您可以识别出对城市列表的另一个 map-filter-reduce 处理。
现在,让我们做一个简单的思想实验:假设 Stream API 不存在,并且 Collection
接口上存在 map()
和 filter()
方法,以及 sum()
方法。
使用这些(虚构的)方法,前面的代码可以变成以下代码。
int sum = cities.map(city -> city.getPopulation())
.filter(population -> population > 100_000)
.sum();
从可读性和表现力方面来看,这段代码非常容易理解。所以您可能想知道:为什么这些 map 和 filter 方法没有添加到 Collection
接口中?
让我们深入探讨一下:这些 map()
和 filter()
方法的返回类型是什么?好吧,既然我们是在集合框架中,返回一个集合似乎很自然。所以您可以这样编写代码。
Collection<Integer> populations = cities.map(city -> city.getPopulation());
Collection<Integer> filteredPopulations = populations.filter(population -> population > 100_000);
int sum = filteredPopulations.sum();
即使链接调用提高了可读性,这段代码仍然应该是正确的。
现在让我们分析这段代码。
- 第一步是映射步骤。您看到,如果您必须处理 1,000 个城市,那么这个映射步骤会产生 1,000 个整数并将它们放入一个集合中。
- 第二步是过滤步骤。它遍历所有元素,并根据给定的条件删除其中一些元素。这需要测试另外 1000 个元素,并创建另一个集合,可能更小。
由于这段代码返回一个集合,它映射所有城市,然后过滤得到的整数集合。这与您最初编写的 *for 循环* 方式截然不同。存储这个中间整数集合可能会导致大量开销,尤其是在您需要处理大量城市的情况下。for 循环没有这种开销:它直接将结果中的整数加起来,而不会将它们存储在中间结构中。
这种开销很糟糕,在某些情况下甚至更糟。假设您需要知道集合中是否有超过 10 万居民的城市。也许集合中的第一个城市就是这样的城市。在这种情况下,您将能够几乎不费吹灰之力地产生结果。首先,构建所有城市人口的集合,然后过滤它并检查结果是否为空将是荒谬的。
出于明显的性能原因,在 Collection
接口上创建返回 Collection
的 map()
方法并不是正确的方法。您最终会创建不必要的中间结构,这会给内存和 CPU 带来很大的开销。
这就是为什么 map()
和 filter()
方法没有添加到 Collection
接口中的原因。相反,它们是在 Stream
接口上创建的。
正确的模式如下。
Stream<City> streamOfCities = cities.stream();
Stream<Integer> populations = streamOfCities.map(city -> city.getPopulation());
Stream<Integer> filteredPopulations = populations.filter(population -> population > 100_000);
int sum = filteredPopulations.sum(); // in fact this code does not compile; we'll fix it later
Stream
接口避免创建中间结构来存储映射或过滤的对象。这里 map()
和 filter()
方法仍然返回新的流。因此,为了使这段代码正常工作并保持效率,不应该在这些流中存储任何数据。在这段代码中创建的流 streamOfCities
、populations
和 filteredPopulations
必须都是空对象。
这导致了流的一个非常重要的属性
流是一个不存储任何数据的对象。
Stream API 的设计方式是,只要您在流模式中没有创建任何非流对象,就不会进行任何数据计算。在前面的示例中,您正在计算流处理的元素的总和。
这个求和操作触发了计算:cities
列表中的所有对象都通过流的所有操作逐个拉取。首先,它们被映射,然后被过滤,如果它们通过过滤步骤,则被加起来。
流按与编写等效 for 循环相同的顺序处理数据。这样就不会有内存开销。此外,在某些情况下,您可以产生结果,而无需遍历集合中的所有元素。
使用流就是创建操作管道。在某个时刻,您的数据将通过这个管道,并被转换、过滤,然后参与结果的生成。
管道由一系列对流的函数调用组成。每次调用都会产生另一个流。然后,在某个时刻,最后一次调用会产生结果。返回另一个流的操作称为中间操作。另一方面,返回其他内容(包括 void)的操作称为终止操作。
使用中间操作创建管道
中间操作是返回另一个流的操作。调用此类操作会在现有的操作管道上添加一个操作,而不会处理任何数据。它由返回流的函数建模。
使用终止操作计算结果
终止操作是不会返回流的操作。调用此类操作会触发对流源元素的消费。然后,这些元素会按顺序由中间操作管道处理,一次处理一个元素。
终止操作由返回除流以外的任何内容(包括 void)的函数建模。
您不能在一个给定的流实例上调用多个中间或终止方法。如果您这样做,您将收到一个 IllegalStateException
,并显示以下消息:“流已被操作或关闭”,例如以下示例。您不能在 stream
上调用 toList()
方法,因为您已经对其调用了 map()
方法。
var stream = Stream.of(1, 2, 3, 4);
var stream1 = stream.map(i -> i + 1);
var list = stream.toList();
使用专门的数字流避免装箱
Stream API 为您提供了四个接口。
第一个是 Stream
,您可以使用它来定义对任何类型对象的运算管道。
然后有三个专门的接口来处理数字流:IntStream
、LongStream
和 DoubleStream
。这三个流使用数字的原始类型而不是包装类型来避免装箱和拆箱。它们几乎与 Stream
中定义的方法相同,但也有一些例外。由于它们处理的是数字,因此它们具有一些在 Stream
中不存在的终止操作
sum()
:计算总和min()
、max()
:计算流的最小或最大数字average()
:计算数字的平均值summaryStatistics()
:此调用会生成一个特殊的对象,该对象包含多个统计信息,所有这些统计信息都在对您的数据进行一次遍历时计算得出。这些统计信息是流处理的元素数量、最小值、最大值、总和和平均值。
遵循最佳实践
如您所见,您只允许在一个流上调用一个方法,即使该方法是中间方法。因此,将流存储在字段或局部变量中是无用的,有时甚至是危险的。编写接受流作为参数的方法也可能很危险,因为您无法确定接收到的流是否已被操作。流应该在现场创建和使用。
流是一个与源连接的对象。它从该源拉取它处理的元素。该源不应该被流本身修改。这样做会导致不可预知的结果。在某些情况下,该源是不可变的或只读的,因此您将无法这样做,但也有可能这样做。
Stream
接口中提供了大量方法,您将在本教程中看到其中大多数方法。编写修改流本身外部的某些变量或字段的操作是一个不好的主意,始终可以避免。流不应该有任何 *副作用*。
上次更新: 2021 年 9 月 14 日