1. 概述

Java的流API是一个强大且灵活的数据处理工具。流操作本质上是对一组数据进行单次迭代。

然而,有时我们希望对流的不同部分进行不同的处理,并得到多个结果集。在本教程中,我们将学习如何将流分割成多个组并独立处理它们。

2. 使用收集器

流应该只被操作一次,并有一个终端操作。 它可以有多个中间操作,但在关闭之前数据只能被收集一次。

这意味着流API规范明确禁止将流分支,并为每个分支使用不同的中间操作。这会导致多个终端操作。然而,我们可以在终端操作内部分割流。这样就创建了一个分成两或多个部分的结果。

2.1. 二元分割与partitioningBy

如果我们想要将流分为两部分,可以使用Collectors类的partitioningBy方法。它接受一个谓词,并返回一个映射,将满足谓词的元素放在Booleantrue的键下,其余的放在false的键下。

假设我们有一个包含文章信息的文章列表,包括它们的目标发布站点和是否为特色文章。

List<Article> articles = Lists.newArrayList(
  new Article("Baeldung", true),
  new Article("Baeldung", false),
  new Article("Programming Daily", false),
  new Article("The Code", false));

我们将它们分为两组,一组只包含Baeldung的文章,另一组包含剩余的文章:

Map<Boolean, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));

让我们看看哪些文章分别被放在truefalse键下:

assertThat(groupedArticles.get(true)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
  new Article("Programming Daily", false),
  new Article("The Code", false));

2.2. 根据函数分割与groupingBy

如果我们需要更多的分类,就需要使用groupingBy方法。它接受一个函数,用于将每个元素分类到一个组。然后它返回一个链接每个组分类器到其元素集合的映射。

假设我们想根据目标站点对文章进行分组。返回的映射将包含站点名称作为键,与给定站点关联的文章集合作为值:

Map<String, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));

3. 使用teeing

自Java 12以来,我们有了另一种二元分割的选项。我们可以使用teeing收集器。teeing将两个收集器组合成一个复合收集器。每个元素都会被两者处理,然后使用提供的合并函数将它们合并为单个返回值。

3.1. 使用teeing与谓词

teeing收集器与Collectors类中的另一个收集器filtering配合得很好。 它接受一个谓词,使用它来过滤处理过的元素,然后将它们传递给另一个收集器。

让我们将文章分为Baeldung和非Baeldung两类,并计算它们的数量。我们将使用List构造函数作为合并函数:

List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
  Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
  List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);

3.2. 使用teeing处理重叠结果

这个解决方案与之前的区别在于,我们之前创建的组没有重叠,源流中的每个元素最多属于一个组。使用teeing,我们不再受此限制,因为每个收集器都可能处理整个流。让我们看看如何利用这一点。

我们可能希望将文章分为两类:仅包含特色文章的一类和仅包含Baeldung文章的一类。由于一篇文章可能同时是特色文章和针对Baeldung的,所以结果集可能会重叠。

这次我们将不再计数,而是将它们收集到列表中:

List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
  Collectors.filtering(article -> article.featured, Collectors.toList()),
  List::of));

assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);

assertThat(groupedArticles.get(0)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));

4. 使用RxJava

虽然Java的流API是一个有用的工具,但有时还不够。其他解决方案,如由RxJava提供的响应式流,可能能帮助我们。让我们看一个简短的例子,说明如何使用Observable和多个Subscriber来实现与流示例相同的结果。

4.1. 创建Observable

首先,我们需要从文章列表创建一个Observable实例。 我们可以使用Observable类的from工厂方法:

Observable<Article> observableArticles = Observable.from(articles);

4.2. 过滤Observables

接下来,我们需要创建过滤文章的Observables。为此,我们将使用Observable类的filter方法:

Observable<Article> baeldungObservable = observableArticles.filter(
  article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
  article -> article.featured);

4.3. 创建多个Subscriber

最后,我们需要订阅Observables并提供一个动作,描述我们想要如何处理文章。 在实际应用中,这可能是将它们保存到数据库或发送给客户端,但我们暂时将其简化为添加到列表中:

List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);

5. 总结

在本教程中,我们学习了如何将流分割成组并分别处理它们。首先,我们探讨了旧版的流API方法:groupingBypartitioningBy。接着,我们使用了Java 12引入的teeing方法的新方法。最后,我们看到了如何使用RxJava来实现类似的结果,具有更大的弹性。

如往常一样,源代码可在GitHub上获取。