创建自己的收集器
了解收集器的工作原理
正如我们之前提到的,Collectors
工厂类只处理对象流,因为collect()
方法接受收集器对象作为参数,只存在于Stream
中。如果你需要收集数字流,那么你需要了解收集器的构建元素。
简而言之,收集器建立在四个基本组件之上。前两个用于收集流中的元素。第三个只在并行流中需要。第四个在某些类型的收集器中需要,这些收集器需要对构建的容器进行后处理。
第一个组件用于创建容器,流中的元素将被收集到该容器中。这个容器很容易识别。例如,在我们之前部分中介绍的案例中,我们使用了ArrayList
类、HashSet
类或HashMap
类。创建这样的容器可以用Supplier
的实例来建模。这个第一个组件被称为供应商。
第二个组件模拟将流中的单个元素添加到此容器中。此操作将由 Stream API 的实现重复调用,以将流中的所有元素逐个添加到容器中。
在收集器 API 中,此组件由BiConsumer
的实例来建模。这个双消费者接受两个参数。
- 第一个是容器本身,其中部分填充了流中的先前元素。
- 第二个是应该添加到此部分填充的容器中的流元素。
这个双消费者在收集器 API 的上下文中被称为累加器。
这两个组件应该足以使收集器工作,但 Stream API 带来了一个约束,使得收集器正常工作需要两个额外的组件。
你可能还记得 Stream API 支持并行化。这一点将在本教程的后面部分详细介绍。你需要知道的是,并行化将你的流中的元素拆分成子流,每个子流由你的 CPU 的一个核心处理。收集器 API 可以在这种情况下工作:每个子流只会被收集到你的收集器创建的容器的自己的实例中。
一旦这些子流被处理,你将拥有多个容器,每个容器都包含它处理的子流中的元素。这些容器是相同的,因为它们是用相同的供应商创建的。现在,你需要一种方法将它们合并成一个。为了能够做到这一点,收集器 API 需要第三个组件,一个组合器,它将这些容器合并在一起。组合器由BinaryOperator
的实例来建模,它接受两个部分填充的容器并返回一个。
这个BinaryOperator
也由 Stream API 中的BiConsumer
来建模。
第四个组件被称为完成器,将在本部分的后面介绍。
将原始类型收集到集合中
使用前三个组件,你可以使用collect()
方法来自专门的数字流。该IntStream.collect()
方法接受三个参数
- 一个
Supplier
的实例,称为供应商; - 一个
ObjIntConsumer
的实例,称为累加器; - 一个
BiConsumer
的实例,称为组合器。
让我们编写代码以将IntStream
收集到List<Integer>
的实例中。
Supplier<List<Integer>> supplier = ArrayList::new;
ObjIntConsumer<List<Integer>> accumulator = Collection::add;
BiConsumer<List<Integer>, List<Integer>> combiner = Collection::addAll;
List<Integer> collect =
IntStream.range(0, 10)
.collect(supplier, accumulator, combiner );
System.out.println("collect = " + collect);
运行此代码会产生以下结果。
collect = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
将此数据收集到一个集合中只需要更改supplier
的实现并相应地调整类型。
将原始类型收集到 StringBuffer 中
让我们检查一下如何实现Collectors.joining()
的等效项,以将原始类型流中的元素连接到单个字符字符串中。该String
类是不可变的,因此你无法在其中累积元素。与其使用String
类,你可以使用StringBuffer
类,它是可变的。
在StringBuffer
中收集元素遵循与前一个相同的模式。
Supplier<StringBuffer> supplier = StringBuffer::new;
ObjIntConsumer<StringBuffer> accumulator = StringBuffer::append;
BiConsumer<StringBuffer, StringBuffer> combiner = StringBuffer::append;
StringBuffer collect =
IntStream.range(0, 10)
.collect(supplier, accumulator, combiner);
System.out.println("collect = " + collect);
运行此代码会产生以下结果。
collect = 0123456789
使用完成器对收集器进行后处理
你在上一段中编写的代码几乎完成了你需要做的:它将字符字符串连接到StringBuffer
的实例中,你可以通过调用它的toString()
方法来创建一个常规的String
对象。但是Collectors.joining()
收集器直接生成一个String
,而无需你调用toString()
。那么它是如何做到的呢?
Collector API 定义了第四个组件,专门用于处理这种情况,称为 *finisher*。Finisher 是 Function
的实例,它接收累积元素的容器并将其转换为其他内容。在 Collectors.joining()
的情况下,此函数只是以下内容。
Function<StringBuffer, String> finisher = stringBuffer -> stringBuffer.toString();
许多收集器中,finisher 只是身份函数。以下收集器就是这种情况:toList()
、toSet()
、groupingBy()
和 toMap()
。
在所有其他情况下,收集器内部使用的可变容器将成为一个中间容器,在返回给应用程序之前,它将被映射到其他对象,可能是另一个容器。这就是 Collector API 如何处理不可变列表、集合或映射的创建。Finisher 用于在将中间容器返回给您的应用程序之前将其密封到不可变容器中。
Finisher 的其他用途可以提高代码的可读性。 Collectors
工厂类有一个工厂方法,我们还没有介绍:collectingAndThen()
方法。此方法将收集器作为第一个参数,将 finisher 作为第二个参数。它只会将此函数应用于使用第一个收集器收集流的结果,然后使用您提供的函数对其进行映射。
您可能还记得以下示例,我们在前面的部分中已经多次检查过它。它与提取直方图的最大值有关。
Collection<String> strings =
List.of("two", "three", "four", "five", "six", "seven", "eight", "nine",
"ten", "eleven", "twelve");
Map<Integer, Long> histogram =
strings.stream()
.collect(
Collectors.groupingBy(
String::length,
Collectors.counting()));
Map.Entry<Integer, Long> maxValue =
histogram.entrySet().stream()
.max(Map.Entry.comparingByValue())
.orElseThrow();
System.out.println("maxValue = " + maxValue);
在第一步中,您构建了一个类型为 Map<Inter, Long>
的直方图,在第二步中,您通过按值比较键值对来提取此直方图的最大值。
实际上,第二步是将映射转换为来自此映射的特殊键值对。您可以使用以下函数对其进行建模。
Function<Map<Integer, Long>, Map.Entry<Integer, Long>> finisher =
map -> map.entrySet().stream()
.max(Map.Entry.comparingByValue())
.orElseThrow();
此函数的类型乍一看可能很复杂。实际上,它只是从映射中提取键值对。因此,它接收 Map
的实例,并返回来自该映射的键值对,它是 Map.Entry
的实例,类型相同。
现在您有了此函数,您可以通过使用 collectingAndThen()
将此最大值提取步骤集成到收集器本身中。然后模式变为以下内容。
Collection<String> strings =
List.of("two", "three", "four", "five", "six", "seven", "eight", "nine",
"ten", "eleven", "twelve");
Function<Map<Integer, Long>, Map.Entry<Integer, Long>> finisher =
map -> map.entrySet().stream()
.max(Map.Entry.comparingByValue())
.orElseThrow();
Map.Entry<Integer, Long> maxValue =
strings.stream()
.collect(
Collectors.collectingAndThen(
Collectors.groupingBy(
String::length,
Collectors.counting()),
finisher
));
System.out.println("maxValue = " + maxValue);
您可能想知道为什么要编写这种看起来很复杂的代码?
现在您有了由单个收集器建模的最大值提取器,您可以将其用作另一个收集器的下游收集器。能够做到这一点可以使多个收集器组合起来,对您的数据进行更复杂的计算。
使用 Teeing 收集器组合两个收集器的结果
Java SE 12 中的 Collectors
类中添加了一个方法,称为 teeing()
。此方法接收两个下游收集器和一个合并函数。
让我们通过一个用例来了解您可以使用收集器做什么。假设您有以下 Car
和 Truck
记录。
enum Color {
RED, BLUE, WHITE, YELLOW
}
enum Engine {
ELECTRIC, HYBRID, GAS
}
enum Drive {
WD2, WD4
}
interface Vehicle {}
record Car(Color color, Engine engine, Drive drive, int passengers) {}
record Truck(Engine engine, Drive drive, int weight) {}
汽车对象有几个组件:颜色、发动机、驱动器以及它可以运输的乘客数量。卡车有发动机、驱动器,并且可以运输一定数量的货物。两者都实现了相同的接口:Vehicle
。
假设您有一个车辆集合,您需要找到所有带有电动发动机的汽车。根据您的应用程序,您最终可能会使用流过滤您的汽车集合。或者,如果您知道下一个请求将是获取带有混合动力发动机的汽车,您可能更愿意准备一个映射,以发动机作为键,以及带有发动机类型的汽车列表作为值。在这两种情况下,Stream API 都将为您提供获取所需内容的正确模式。
假设您需要将所有电动卡车添加到此集合中。仍然可以使用一次遍历您的车辆集合来创建此并集,但您将用于过滤数据的谓词将变得越来越复杂。它可能看起来像这样。
Predicate<Vehicle> predicate =
vehicle -> vehicle instanceof Car car && car.engine() == Engine.ELECTRIC ||
vehicle instanceof Truck truck && truck.engine() == Engine.ELECTRIC;
您真正需要的是以下内容
- 过滤车辆以获取所有电动汽车
- 过滤它们以获取所有电动卡车
- 合并两个结果。
这正是 teeing 收集器可以为您做的事情。teeing 收集器由 Collectors.teeing()
工厂方法创建,该方法接收三个参数。
- 第一个下游收集器,用于收集流的数据。
- 第二个下游收集器,也用于以独立的方式收集您的数据。
- 一个双函数,用于合并由两个下游收集器创建的两个容器。
您的数据在一趟中处理,以确保最佳性能。
我们已经介绍了可以使用收集器过滤流元素的模式。合并函数只是对 Collection.addAll()
方法的调用。以下是代码
List<Vehicle> electricVehicles = vehicles.stream()
.collect(
Collectors.teeing(
Collectors.filtering(
vehicle -> vehicle instanceof Car car && car.engine() == Engine.ELECTRIC,
Collectors.toList()),
Collectors.filtering(
vehicle -> vehicle instanceof Truck truck && truck.engine() == Engine.ELECTRIC,
Collectors.toList()),
(cars, trucks) -> {
cars.addAll(trucks);
return cars;
}));
上次更新: 2021 年 9 月 14 日