当前教程
在流上添加中间操作
系列中的下一篇

系列中的上一篇: 使用流 API 在内存中处理数据

系列中的下一篇: 创建流

在流上添加中间操作

 

将流映射到另一个流

流映射包括使用函数转换其元素。此转换可能会更改该流处理的元素的类型,但您也可以在不更改其类型的情况下转换它们。

您可以使用 map() 方法将流映射到另一个流,该方法将此 Function 作为参数。流映射意味着该流处理的所有元素都将使用该函数进行转换。

代码模式如下

List<String> strings = List.of("one", "two", "three", "four");
Function<String, Integer> toLength = String::length;
Stream<Integer> ints = strings.stream()
                              .map(toLength);

您可以复制此代码,并将其粘贴到您的 IDE 中以运行它。您将看不到任何内容,您可能想知道为什么。

答案实际上很简单:该流上没有定义终端操作。您的反射应该是注意到这一点并意识到这段代码没有做任何事情。它不处理任何数据。要回答这个问题:“这段代码在做什么?”,只有一个有效的答案:“什么也没做”。

让我们添加一个非常有用的终端操作,它将处理后的元素放入列表中:collect(Collectors.toList())。如果您不确定这段代码到底在做什么,请不要担心;我们将在本教程的后面部分介绍它。代码变为如下。

List<String> strings = List.of("one", "two", "three", "four");
List<Integer> lengths = strings.stream()
                               .map(String::length)
                               .collect(Collectors.toList());
System.out.println("lengths = " + lengths);

运行此代码将打印以下内容

lengths = [3, 3, 5, 4]

您可以看到此模式创建了一个 Stream<Integer>,由 map(String::length) 返回。您也可以通过调用 mapToInt() 而不是常规的 map() 调用,将其转换为专门的 IntStream。此 mapToInt() 方法将 ToIntFuction<T> 作为参数。在前面的示例中将 .map(String::length) 更改为 .mapToInt(String::length) 不会创建编译错误。方法引用 String::length 可以是两种类型:Function<String, Integer>ToIntFunction<String>

在专门的流上没有 collect() 方法接受 Collector 作为参数。因此,如果您使用 mapToInt(),您将无法再将结果收集到列表中,至少不能使用这种模式。让我们对该流进行一些统计。此 summaryStatistics() 方法非常方便,并且仅在这些专门的原始类型流上可用。

List<String> strings = List.of("one", "two", "three", "four");
IntSummaryStatistics stats = strings.stream()
                                    .mapToInt(String::length)
                                    .summaryStatistics();
System.out.println("stats = " + stats);

结果如下

stats = IntSummaryStatistics{count=4, sum=15, min=3, average=3,750000, max=5}

有三种方法可以从 Stream 到原始类型流:mapToInt()mapToLong()mapToDouble()

 

过滤流

过滤是关于使用谓词丢弃流处理的某些元素。此方法在对象流和原始类型流上可用。

假设您需要计算长度为 3 的字符字符串的数量。您可以编写以下代码来执行此操作

List<String> strings = List.of("one", "two", "three", "four");
long count = strings.stream()
                    .map(String::length)
                    .filter(length -> length == 3)
                    .count();
System.out.println("count = " + count);

运行此代码将产生以下结果

count = 2

请注意,您刚刚使用了 Stream API 的另一个终端操作,count(),它只计算处理过的元素的数量。此方法返回一个 long,因此您可以使用它来计算很多元素。比您可以放入 ArrayList 中的元素还要多。

 

将流扁平化以处理 1:p 关系

让我们在示例中查看 flatMap 操作。假设您有两个实体:StateCity。一个 state 实例包含多个 city 实例,存储在列表中。

这是 City 类的代码。

public class City {
    
    private String name;
    private int population;

    // constructors, getters
    // toString, equals and hashCode
}

这是 State 类的代码,以及与 City 类的关系。

public class State {
    
    private String name;
    private List<City> cities;

    // constructors, getters
    // toString, equals and hashCode
}

假设您的代码正在处理一个州列表,并且在某个时候您需要计算所有城市的总人口。

您可以编写以下代码

List<State> states = ...;

int totalPopulation = 0;
for (State state: states) {
    for (City city: state.getCities()) {
        totalPopulation += city.getPopulation();
    }
}

System.out.println("Total population = " + totalPopulation);

此代码的内部循环是您可以使用以下流编写的 map-reduce 形式

totalPopulation += state.getCities().stream().mapToInt(City::getPopulation).sum();

州循环和此流之间的连接不适合 map/reduce 模式,并且在循环中放置流不是一个很好的代码模式。

这正是 flatmap 运算符的作用。此运算符打开对象之间的一对多关系,并在这些关系上创建流。 flatMap() 方法将一个特殊函数作为参数,该函数返回一个 Stream 对象。此函数定义了给定类与另一个类之间的关系。

在我们的示例中,此函数很简单,因为 State 类中有一个 List<City>。因此,您可以用以下方式编写它。

Function<State, Stream<City>> stateToCity = state -> state.getCities().stream();

此列表不是必需的。假设您有一个 Continent 类,它保存一个 Map<String, Country>,其中键是国家/地区的国家/地区代码(CAN 代表加拿大,MEX 代表墨西哥,FRA 代表法国,等等)。假设 Continent 类有一个方法 getCountries(),它返回此映射。

在这种情况下,此函数可以这样编写。

Function<Continent, Stream<Country>> continentToCountry = 
    continent -> continent.getCountries().values().stream();

flatMap() 方法以两个步骤处理流。

  • 第一步包括使用此函数映射流的所有元素。从 Stream<State> 它创建一个 Stream<Stream<City>>,因为每个州都映射到一个城市流。
  • 第二步包括扁平化生成的流流。您最终不会拥有一个城市流流(每个州一个流),而是拥有一个单一流,其中包含所有州的所有城市。

因此,使用嵌套 for 循环模式编写的代码可以变成以下代码,这要归功于 flatmap 运算符。

List<State> states = ...;

int totalPopulation = 
        states.stream()
              .flatMap(state -> state.getCities().stream())
              .mapToInt(City::getPopulation)
              .sum();

System.out.println("Total population = " + totalPopulation);

 

使用 Flatmap 和 MapMulti 验证元素转换

flatMap 操作可用于验证流元素的转换。

假设您有一系列表示整数的字符字符串。您需要使用 Integer.parseInt() 将它们转换为整数。不幸的是,其中一些字符串已损坏:可能有些是空的、为空的,或者在末尾有额外的空格字符。所有这些都会导致解析失败,并出现 NumberFormatException。当然,您可以尝试使用谓词过滤此流以删除错误字符串,但最安全的方法是使用 try-catch 模式。

尝试使用过滤器不是正确的方法。您将要编写的谓词将如下所示。

Predicate<String> isANumber = s -> {
    try {
        int i = Integer.parseInt(s);
        return true;
    } catch (NumberFormatException e) {
        return false;
    }
};

第一个缺陷是您实际上需要进行转换才能查看它是否有效。然后您将不得不再次在您的映射函数中执行它,该函数将在下一步执行:不要这样做!第二个缺陷是返回 catch 块永远不是一个好主意。

您真正需要做的是,当您在该字符串中拥有一个正确的整数时返回一个整数,如果它是一个损坏的字符串则返回空。这是 flatmapper 的工作。如果您可以解析一个整数,则可以返回一个包含结果的流。在另一种情况下,您可以返回一个空流。

然后您可以编写以下函数。

Function<String, Stream<Integer>> flatParser = s -> {
    try {
        return Stream.of(Integer.parseInt(s));
    } catch (NumberFormatException e) {
    }
    return Stream.empty();
};

List<String> strings = List.of("1", " ", "2", "3 ", "", "3");
List<Integer> ints = 
    strings.stream()
           .flatMap(flatParser)
           .collect(Collectors.toList());
System.out.println("ints = " + ints);

运行此代码会产生以下结果。所有有故障的字符串都被静默删除了。

ints = [1, 2, 3]

这种 flatmap 代码的使用效果很好,但它有一个开销:您需要处理的每个流元素都会创建一个流。从 Java SE 16 开始,Stream API 中添加了一个方法。它专门针对这种情况添加:当您创建许多零个或一个对象的流时。此方法称为 mapMulti(),并接受一个 BiConsumer 作为参数。

BiConsumer 使用两个参数

  • 需要映射的流元素
  • 一个 Consumer,此 BiConsumer 需要使用映射结果调用它

使用元素调用使用者会将该元素添加到结果流中。如果无法进行映射,则双使用者不会调用此使用者,并且不会添加任何元素。

让我们使用此 mapMulti() 方法重写您的模式。

List<Integer> ints =
        strings.stream()
               .<Integer>mapMulti((string, consumer) -> {
                    try {
                        consumer.accept(Integer.parseInt(string));
                    } catch (NumberFormatException ignored) {
                    }
               })
               .collect(Collectors.toList());
System.out.println("ints = " + ints);

运行此代码会产生与之前相同的结果。所有有故障的字符串都被静默删除了,但这次没有创建其他流。

ints = [1, 2, 3]

要使用此方法,您需要告诉编译器用于将元素添加到结果流的 Consumer 的类型。这是通过这种特殊语法完成的,您将此类型放在调用 mapMulti() 之前。这不是您在 Java 代码中经常看到的语法。您可以在静态和非静态上下文中使用它。

 

删除重复项并对流进行排序

Stream API 有两个方法,distinct()sorted(),它们只会检测和删除重复项并对流的元素进行排序。该 distinct() 方法使用 hashCode()equals() 方法来发现重复项。该 sorted() 方法有一个重载,它接受一个比较器,该比较器将用于比较和排序流的元素。如果您没有提供比较器,则 Stream API 假设流的元素是可比较的。如果它们不可比较,则会引发 ClassCastException

您可能还记得本教程的前面部分,流应该是一个不存储任何数据的空对象。这条规则有几个例外,这两个方法就属于例外。

实际上,为了发现重复项,该 distinct() 方法需要存储流的元素。当它处理一个元素时,它首先检查该元素是否已经被看到过。

对于 sorted() 方法也是如此。此方法需要存储所有元素,然后在将它们发送到处理管道的下一步之前在内部缓冲区中对其进行排序。

distinct() 方法可以用于无界(无限)流,该 sorted() 方法不能。

 

限制和跳过流的元素

Stream API 提供了两种选择流元素的方法:基于它们的索引,或者使用谓词。

第一种方法是使用 skip()limit() 方法,两者都接受一个 long 作为参数。在使用这些方法时,需要避免一个陷阱。您需要记住,每次在流上调用中间方法时,都会创建一个新的流。因此,如果您在 skip() 之后调用 limit(),请不要忘记从该新流开始计算您的元素。

假设您有一个从 1 开始的所有整数的流。您需要在整数流上选择 3 到 8 之间的整数。您可能很想调用 skip(2).limit(8),传递在第一个流上计算的边界。不幸的是,这不是流的工作方式。第二个调用 limit(8) 在从 3 开始的流上运行,因此它将选择直到 11 的整数,这不是您需要的。正确的代码如下。

List<Integer> ints = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Integer> result = 
    ints.stream()
        .skip(2)
        .limit(5)
        .collect(Collectors.toList());

System.out.println("result = " + result);

此代码打印以下内容。

result = [3, 4, 5, 6, 7]

重要的是要理解 skip(2) 已在处理元素 1, 2, 3, ... 的流上调用,并产生另一个处理元素 3, 4, 5, 6, ... 的流。

因此 limit(3) 选择该流的前 5 个元素,因此 3, 4, 5, 6, 7

Java SE 9 在此领域引入了另外两种方法。它不是根据流中的索引跳过和限制元素,而是根据谓词的值进行操作。

  • dropWhile(predicate) 会丢弃流处理的元素,直到对这些元素应用谓词变为真。此时,流处理的所有元素都会被传输到下一个流。
  • takeWhile(predicate) 做相反的事情:它会将元素传输到下一个流,直到对这些元素应用此谓词变为假。

请注意,这些方法就像门一样。一旦 dropWhile() 打开门让处理过的元素流过,它就不会关闭它。一旦 takeWhile() 关闭门,它就不能重新打开它,不会再有元素被发送到下一个操作。

 

连接流

Stream API 提供了几种将多个流连接成一个流的模式。最明显的方法是使用在 Stream 接口中定义的工厂方法:concat().

此方法接受两个流,并生成一个流,其中包含第一个流生成的元素,后跟第二个流的元素。

您可能想知道为什么此方法不接受可变参数以允许连接任意数量的流。

原因是,只要您有两个流要连接,使用此方法就可以了。如果您有两个以上的流,那么 JavaDoc API 文档建议您使用另一种模式,该模式基于 flatmap 的使用。

让我们看看它在示例中的工作原理。

List<Integer> list0 = List.of(1, 2, 3);
List<Integer> list1 = List.of(4, 5, 6);
List<Integer> list2 = List.of(7, 8, 9);

// 1st pattern: concat
List<Integer> concat = 
    Stream.concat(list0.stream(), list1.stream())
          .collect(Collectors.toList());

// 2nd pattern: flatMap
List<Integer> flatMap =
    Stream.of(list0.stream(), list1.stream(), list2.stream())
          .flatMap(Function.identity())
          .collect(Collectors.toList());

System.out.println("concat  = " + concat);
System.out.println("flatMap = " + flatMap);

运行此代码会产生以下结果

concat  = [1, 2, 3, 4, 5, 6]
flatMap = [1, 2, 3, 4, 5, 6, 7, 8, 9]

使用 flatMap() 方式更好的原因是 concat() 在连接期间创建中间流。当您使用 Stream.concat() 时,会创建一个新流来连接您的两个流。如果您需要连接三个流,您最终将创建一个第一个流来处理第一个连接,以及一个第二个流来处理第二个连接。因此,每次连接都需要一个很快就会被丢弃的流。

使用 flatmap 模式,您只需创建一个单个流来保存所有流并执行 flatmap。开销要低得多。

您可能想知道为什么添加了这两种模式。看起来 concat() 并不是真正有用。实际上,由 concat 和 flatmap 模式生成的流之间存在细微差别。

如果您要连接的两个流的源的大小已知,那么结果流的大小也已知。实际上,它只是两个连接流的总和。

对流使用 flatmap 可能会在结果流中创建未知数量的元素。Stream API 会丢失对结果流中将要处理的元素数量的跟踪。

换句话说:concat 生成一个 SIZED 流,而 flatmap 则不会。这个 SIZED 属性是流可能具有的属性,我们将在本教程的后面部分介绍。

 

调试流

有时在运行时检查流处理的元素可能很方便。Stream API 提供了一个方法来实现这一点:peek() 方法。此方法旨在用于调试数据处理管道。您不应该在生产代码中使用此方法。

您绝对应该避免使用此方法在应用程序中执行副作用。

此方法接受一个消费者作为参数,API 将在流的每个元素上调用该消费者。让我们看看这个方法的实际应用。

List<String> strings = List.of("one", "two", "three", "four");
List<String> result =
        strings.stream()
                .peek(s -> System.out.println("Starting with = " + s))
                .filter(s -> s.startsWith("t"))
                .peek(s -> System.out.println("Filtered = " + s))
                .map(String::toUpperCase)
                .peek(s -> System.out.println("Mapped = " + s))
                .collect(Collectors.toList());
System.out.println("result = " + result);

如果您运行此代码,您将在控制台中看到以下内容。

Starting with = one
Starting with = two
Filtered = two
Mapped = TWO
Starting with = three
Filtered = three
Mapped = THREE
Starting with = four
result = [TWO, THREE]

让我们分析一下这个输出。

  1. 第一个要处理的元素是one。您可以看到它被过滤掉了。
  2. 第二个是two。此元素通过了过滤器,然后被映射到大写。然后它被添加到结果列表中。
  3. 第三个是three,它也通过了过滤器,并且在被添加到结果列表之前也被映射到大写。
  4. 第四个也是最后一个是four,它被过滤步骤拒绝了。

您在本教程的前面部分看到过一个点,现在很明显地出现了:流确实会逐个处理它必须处理的所有元素,从流的开头到结尾。这在之前已经提到过,现在您可以看到它在实际应用中的效果。

您可以看到,这个 peek(System.out::println) 模式非常有用,可以逐个跟踪流处理的元素,而无需调试代码。调试流很困难,因为您需要小心放置断点。大多数情况下,在流处理上放置断点会将您带到 Stream 接口的实现。这不是您需要的。大多数情况下,您需要在 lambda 表达式的代码中放置这些断点。


上次更新: 2021 年 9 月 14 日


当前教程
在流上添加中间操作
系列中的下一篇

系列中的上一篇: 使用流 API 在内存中处理数据

系列中的下一篇: 创建流