1. 概述
Java的流API是一个强大且灵活的数据处理工具。流操作本质上是对一组数据进行单次迭代。
然而,有时我们希望对流的不同部分进行不同的处理,并得到多个结果集。在本教程中,我们将学习如何将流分割成多个组并独立处理它们。
2. 使用收集器
流应该只被操作一次,并有一个终端操作。 它可以有多个中间操作,但在关闭之前数据只能被收集一次。
这意味着流API规范明确禁止将流分支,并为每个分支使用不同的中间操作。这会导致多个终端操作。然而,我们可以在终端操作内部分割流。这样就创建了一个分成两或多个部分的结果。
2.1. 二元分割与partitioningBy
如果我们想要将流分为两部分,可以使用Collectors
类的partitioningBy
方法。它接受一个谓词,并返回一个映射,将满足谓词的元素放在Boolean
值true
的键下,其余的放在false
的键下。
假设我们有一个包含文章信息的文章列表,包括它们的目标发布站点和是否为特色文章。
List<Article> articles = Lists.newArrayList(
new Article("Baeldung", true),
new Article("Baeldung", false),
new Article("Programming Daily", false),
new Article("The Code", false));
我们将它们分为两组,一组只包含Baeldung的文章,另一组包含剩余的文章:
Map<Boolean, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));
让我们看看哪些文章分别被放在true
和false
键下:
assertThat(groupedArticles.get(true)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
new Article("Programming Daily", false),
new Article("The Code", false));
2.2. 根据函数分割与groupingBy
如果我们需要更多的分类,就需要使用groupingBy
方法。它接受一个函数,用于将每个元素分类到一个组。然后它返回一个链接每个组分类器到其元素集合的映射。
假设我们想根据目标站点对文章进行分组。返回的映射将包含站点名称作为键,与给定站点关联的文章集合作为值:
Map<String, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));
3. 使用teeing
自Java 12以来,我们有了另一种二元分割的选项。我们可以使用teeing
收集器。teeing
将两个收集器组合成一个复合收集器。每个元素都会被两者处理,然后使用提供的合并函数将它们合并为单个返回值。
3.1. 使用teeing
与谓词
teeing
收集器与Collectors
类中的另一个收集器filtering
配合得很好。 它接受一个谓词,使用它来过滤处理过的元素,然后将它们传递给另一个收集器。
让我们将文章分为Baeldung和非Baeldung两类,并计算它们的数量。我们将使用List
构造函数作为合并函数:
List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);
3.2. 使用teeing
处理重叠结果
这个解决方案与之前的区别在于,我们之前创建的组没有重叠,源流中的每个元素最多属于一个组。使用teeing
,我们不再受此限制,因为每个收集器都可能处理整个流。让我们看看如何利用这一点。
我们可能希望将文章分为两类:仅包含特色文章的一类和仅包含Baeldung文章的一类。由于一篇文章可能同时是特色文章和针对Baeldung的,所以结果集可能会重叠。
这次我们将不再计数,而是将它们收集到列表中:
List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
Collectors.filtering(article -> article.featured, Collectors.toList()),
List::of));
assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);
assertThat(groupedArticles.get(0)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));
4. 使用RxJava
虽然Java的流API是一个有用的工具,但有时还不够。其他解决方案,如由RxJava提供的响应式流,可能能帮助我们。让我们看一个简短的例子,说明如何使用Observable
和多个Subscriber
来实现与流示例相同的结果。
4.1. 创建Observable
首先,我们需要从文章列表创建一个Observable
实例。 我们可以使用Observable
类的from
工厂方法:
Observable<Article> observableArticles = Observable.from(articles);
4.2. 过滤Observables
接下来,我们需要创建过滤文章的Observables
。为此,我们将使用Observable
类的filter
方法:
Observable<Article> baeldungObservable = observableArticles.filter(
article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
article -> article.featured);
4.3. 创建多个Subscriber
最后,我们需要订阅Observables
并提供一个动作,描述我们想要如何处理文章。 在实际应用中,这可能是将它们保存到数据库或发送给客户端,但我们暂时将其简化为添加到列表中:
List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);
5. 总结
在本教程中,我们学习了如何将流分割成组并分别处理它们。首先,我们探讨了旧版的流API方法:groupingBy
和partitioningBy
。接着,我们使用了Java 12引入的teeing
方法的新方法。最后,我们看到了如何使用RxJava来实现类似的结果,具有更大的弹性。
如往常一样,源代码可在GitHub上获取。