1. 概述

Java Stream API 提供了多种方法,允许对流元素进行修改。然而,这些方法内部的操作必须是非干扰性和无状态的。否则,可能会导致行为不正确或输出错误。

本教程将讨论在 Java 流中修改元素时常见的错误,并提供正确的处理方式。

2. 修改流元素的状态

以一个名为 Person 类的列表为例:

public class Person {
    private String name;
    private String email;

    public Person(String name, String email) {
        this.name = name;
        this.email = email;
    }
    //standard getters and setters..
}

我们将修改流中的 Person 元素的电子邮件地址并转换为大写。

2.1. 使用 forEach() 方法修改

首先,我们可以使用简单的方法,即使用 [forEach()](/foreach-java) 方法遍历列表:

@Test
void givenPersonList_whenUpdatePersonEmailByInterferingWithForEach_thenPersonEmailUpdated() {
    personList.stream().forEach(e -> e.setEmail(e.getEmail().toUpperCase()));

    personList.stream().forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

在这个方法中,当我们遍历 Person 对象列表时,会将每个元素的电子邮件地址转换为大写。乍看似乎合理,但违反了非干扰原则。这意味着在流管道中,我们永远不应修改原始源。

除非流源是并发的,否则在执行流管道时修改流的数据源可能导致异常、错误答案或不符合规范的行为。

2.2. 使用 peek() 方法修改

接下来,我们来看看[peek()](/java-streams-peek-api) 方法。我们常常想用它来修改流中元素的属性:

@Test
void givenPersonList_whenUpdatePersonEmailByInterferingWithPeek_thenPersonEmailUpdated() {
    personList.stream()
      .peek(e -> e.setEmail(e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    personList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

同样地,通过更新 personList 源,我们重复了前面章节中提到的错误。

2.3. 使用 map() 方法修改

forEach() 是流管道中的终止操作,而[map()](/java-8-streams-introduction#3-mapping)peek() 一样是中间操作,返回一个 Stream。在 map() 中,我们将创建一个新的带有大写电子邮件的 Person 对象,然后收集到一个新的列表:

@Test
void givenPersonList_whenUpdatePersonEmailWithMapMethod_thenPersonEmailUpdated() {
    List<Person> newPersonList = personList.stream()
      .map(e -> new Person(e.getName(), e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    newPersonList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

在这个方法中,我们没有修改原始列表,而是从中创建了一个新的列表 newPersonList。因此,它是非干扰的。它也是无状态的,因为其中操作的结果不会相互影响,通常独立进行。无论是否是顺序还是并行处理,都推荐遵循这些原则。

考虑到函数式编程的本质之一是不可变性,我们可以尝试创建一个不可变的 Person 类:

public class ImmutablePerson {

    private String name;
    private String email;

    public ImmutablePerson(String name, String email) {
        this.name = name;
        this.email = email;
    }

    public ImmutablePerson withEmail(String email) {
        return new ImmutablePerson(this.name, email);
    }
    //Standard getters
}

ImmutablePerson 类没有设置方法,但它提供了一个方法 withEmail(),用于返回一个新的 ImmutablePerson,其中电子邮件地址为大写。

现在,让我们在流中使用它来修改元素:

@Test
void givenPersonList_whenUpdateImmutablePersonEmailWithMapMethod_thenPersonEmailUpdated() {
    List<ImmutablePerson> newImmutablePersonList = immutablePersonList.stream()
      .map(e -> e.withEmail(e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    newImmutablePersonList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

这样就强制实施了非干扰性。

3. 从流中移除元素

在流中执行结构更改更为棘手。这比修改操作更昂贵,如果不小心,可能会导致不一致和不期望的结果。让我们详细探讨这个问题。

3.1. 使用 forEach() 方法移除元素

如果我们想要从流中删除一些元素呢?例如,从列表中删除名字为 John 的人:

@Test
void givenPersonList_whenRemoveWhileIterating_thenThrowException() {
    assertThrows(NullPointerException.class, () -> {
        personList.stream().forEach(e -> {
            if(e.getName().equals("John")) {
                personList.remove(e);
            }
        });
    });
}

我们在 forEach() 方法中尝试在迭代过程中修改列表的结构。令人惊讶的是,这会导致 NullPointerException,与在 ArrayList 中的 forEach() 不同,后者会抛出 [ConcurrentModificationException](/java-concurrentmodificationexception)

@Test
void givenPersonList_whenRemoveWhileIteratingWithForEach_thenThrowException() {
    assertThrows(ConcurrentModificationException.class, () -> {
        personList.forEach(e -> {
            if(e.getName().equals("John")) {
                personList.remove(e);
            }
        });
    });
}

3.2. 使用 CopyOnWriteArrayList 移除元素

CopyOnWriteArrayListArrayList 的线程安全版本。在迭代期间可以从中删除元素:

@Test
void givenPersonList_whenRemoveWhileIterating_thenPersonRemoved() {
    assertEquals(4, personList.size());
    
    CopyOnWriteArrayList<Person> cps = new CopyOnWriteArrayList<>(personList);
    cps.stream().forEach(e -> {
        if(e.getName().equals("John")) {
            cps.remove(e);
        }
    });

    assertEquals(3, cps.size());
}

虽然它可以防止多个线程之间的干扰,但代价过高,因为每次写入操作都会创建快照。

3.3. 使用 filter() 方法移除元素

Java Stream API 提供了更优雅的方法 filter() 来删除流中的元素:

@Test
void givenPersonList_whenRemovePersonWithFilter_thenPersonRemoved() {
    assertEquals(4, personList.size());

    List<Person> newPersonList = personList.stream()
      .filter(e -> !e.getName().equals("John"))
      .collect(Collectors.toList());

    assertEquals(3, newPersonList.size());
}

在上述方法中,filter() 只允许名字不为 JohnPerson 对象继续流管道。再次强调,filter() 方法内的谓词(/java-predicate-chain#basic)应是非干扰和无状态的。这种方法看起来更简洁,易于理解和调试。

4. 总结

在这篇文章中,我们探讨了在流中正确修改元素的方式。确保管道处理是非干扰和无状态的至关重要,否则可能导致意外结果。

如往常一样,本文所使用的代码可以在 GitHub 上找到。