1. 引言
本教程将深入探讨Java Stream API中Collections.parallelStream()
和stream().parallel()
两种并行流实现方式。Java 8为Collection
接口引入了parallelStream()
方法,同时为BaseStream
接口引入了parallel()
方法。
2. Java中的并行流与并行处理
并行流通过在多个CPU核心上并行执行流操作,使我们能够利用多核处理能力。数据被拆分成多个子流,操作并行执行,最终结果聚合形成最终输出。
Java中创建的流默认是串行的,除非明确指定。我们可以通过两种方式将流转换为并行流:
- 调用
Collections.parallelStream()
- 调用
BaseStream.parallel()
当执行并行流操作时,如果流操作未指定处理顺序,Java编译器和运行时会自动决定处理序列以获得最佳并行计算收益。
例如,给定一个很长的Book
对象列表,我们需要统计指定年份出版的书籍数量:
public class Book {
private String name;
private String author;
private int yearPublished;
// getters and setters
}
这里我们可以利用并行流高效完成统计,因为执行顺序不影响最终结果,使其成为并行流操作的完美候选场景。
3. Collections.parallelStream()的使用
在应用中使用并行流的一种方式是在数据源上调用parallelStream()
。此操作返回一个可能并行的流,以集合作为其数据源。将此方法应用到我们的示例中,统计特定年份出版的书籍数量:
long usingCollectionsParallel(Collection<Book> listOfbooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfbooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
parallelStream()
的默认实现通过集合的Spliterator<T>
接口创建并行流。Spliterator是用于遍历和分区其源元素的对象。Spliterator可以通过其trySplit()
方法分区部分元素,使其支持可能的并行操作。
Spliterator API类似于Iterator,允许遍历源元素,专为支持高效并行遍历而设计。在调用parallelStream()
时,将使用集合的默认Spliterator。
4. 在流上使用parallel()方法
我们可以先将集合转换为流,然后通过调用parallel()
将其转换为并行流,实现相同结果。获得并行流后,我们可以用与之前相同的方式查找结果:
long usingStreamParallel(Collection<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.stream().parallel()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
Stream API的BaseStream
接口会尽可能多地拆分底层数据(取决于源集合的默认Spliterator),然后使用Fork-Join框架将其转换为并行流。
两种方法产生的结果完全相同。
5. parallelStream()与stream().parallel()的区别
Collections.parallelStream()
使用源集合的默认Spliterator来拆分数据源以实现并行执行。均匀拆分数据源对实现正确的并行执行至关重要。不均匀拆分的数据源在并行执行中造成的危害比串行执行更大。
在之前的示例中,我们一直使用List<Book>
来存储书籍列表。现在尝试通过重写Collection<T>
接口来创建书籍的自定义集合。
需要记住:重写接口意味着我们必须实现基接口中定义的所有抽象方法。但我们会发现spliterator()
、stream()
和parallelStream()
等方法在接口中是默认方法,它们有默认实现,但我们仍然可以重写这些实现。
我们将书籍的自定义集合命名为MyBookContainer
,并定义自己的Spliterator:
public class BookSpliterator<T> implements Spliterator<T> {
private final Object[] books;
private int startIndex;
public BookSpliterator(Object[] books, int startIndex) {
this.books = books;
this.startIndex = startIndex;
}
@Override
public Spliterator<T> trySplit() {
// 假设源数据太小无法分割,始终返回null
return null;
}
// 其他重写方法如tryAdvance(), estimateSize()等
}
在上述代码中,我们的Spliterator版本接收一个Object数组(本例中为Book)进行分割,但在trySplit()
方法中始终返回null。
⚠️ 注意:这种Spliterator
我们将在自定义集合类MyBookContainer
中使用此Spliterator:
public class MyBookContainer<T> implements Collection<T> {
private static final long serialVersionUID = 1L;
private T[] elements;
public MyBookContainer(T[] elements) {
this.elements = elements;
}
@Override
public Spliterator<T> spliterator() {
return new BookSpliterator(elements, 0);
}
@Override
public Stream<T> parallelStream() {
return StreamSupport.stream(spliterator(), false);
}
// Collection接口的标准重写方法
}
现在尝试在自定义容器类中存储数据并执行并行流操作:
long usingWithCustomSpliterator(MyBookContainer<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
本例中的数据源是MyBookContainer
类型的实例。这段代码内部使用我们的自定义Spliterator来拆分数据源。结果产生的并行流最多只能是串行流。
我们利用parallelStream()
方法返回了一个串行流,尽管其名称暗示是并行流。这正是它与stream().parallel()
的区别所在,后者总是尝试返回其提供的流的并行版本。Java在其文档中记录了这种行为:
@implSpec
* 默认实现从集合的Spliterator创建并行流。
*
* @return 此集合上可能并行的流
* @since 1.8
*/
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
6. 结论
本文介绍了从集合数据源创建并行流的两种方式。通过实现自定义的Collection和Spliterator,我们揭示了parallelStream()
和stream().parallel()
之间的关键区别。
所有代码示例可在GitHub上找到。