1. 引言

本教程将深入探讨Java Stream API中Collections.parallelStream()stream().parallel()两种并行流实现方式。Java 8为Collection接口引入了parallelStream()方法,同时为BaseStream接口引入了parallel()方法。

2. Java中的并行流与并行处理

并行流通过在多个CPU核心上并行执行流操作,使我们能够利用多核处理能力。数据被拆分成多个子流,操作并行执行,最终结果聚合形成最终输出。

Java中创建的流默认是串行的,除非明确指定。我们可以通过两种方式将流转换为并行流:

  • 调用Collections.parallelStream()
  • 调用BaseStream.parallel()

当执行并行流操作时,如果流操作未指定处理顺序,Java编译器和运行时会自动决定处理序列以获得最佳并行计算收益。

例如,给定一个很长的Book对象列表,我们需要统计指定年份出版的书籍数量:

public class Book {
    private String name;
    private String author;
    private int yearPublished;
    
    // getters and setters
}

这里我们可以利用并行流高效完成统计,因为执行顺序不影响最终结果,使其成为并行流操作的完美候选场景。

3. Collections.parallelStream()的使用

在应用中使用并行流的一种方式是在数据源上调用parallelStream()此操作返回一个可能并行的流,以集合作为其数据源。将此方法应用到我们的示例中,统计特定年份出版的书籍数量:

long usingCollectionsParallel(Collection<Book> listOfbooks, int year) {
    AtomicLong countOfBooks = new AtomicLong();
    listOfbooks.parallelStream()
      .forEach(book -> {
          if (book.getYearPublished() == year) {
              countOfBooks.getAndIncrement();
          }
      });
    return countOfBooks.get();
}

parallelStream()的默认实现通过集合的Spliterator<T>接口创建并行流。Spliterator是用于遍历和分区其源元素的对象。Spliterator可以通过其trySplit()方法分区部分元素,使其支持可能的并行操作。

Spliterator API类似于Iterator,允许遍历源元素,专为支持高效并行遍历而设计。在调用parallelStream()时,将使用集合的默认Spliterator

4. 在流上使用parallel()方法

我们可以先将集合转换为流,然后通过调用parallel()将其转换为并行流,实现相同结果。获得并行流后,我们可以用与之前相同的方式查找结果:

long usingStreamParallel(Collection<Book> listOfBooks, int year) {
    AtomicLong countOfBooks = new AtomicLong();
    listOfBooks.stream().parallel()
      .forEach(book -> {
          if (book.getYearPublished() == year) {
              countOfBooks.getAndIncrement();
          }
      });
    return countOfBooks.get();
}

Stream API的BaseStream接口会尽可能多地拆分底层数据(取决于源集合的默认Spliterator),然后使用Fork-Join框架将其转换为并行流。

两种方法产生的结果完全相同。

5. parallelStream()与stream().parallel()的区别

Collections.parallelStream()使用源集合的默认Spliterator来拆分数据源以实现并行执行。均匀拆分数据源对实现正确的并行执行至关重要。不均匀拆分的数据源在并行执行中造成的危害比串行执行更大。

在之前的示例中,我们一直使用List<Book>来存储书籍列表。现在尝试通过重写Collection<T>接口来创建书籍的自定义集合。

需要记住:重写接口意味着我们必须实现基接口中定义的所有抽象方法。但我们会发现spliterator()stream()parallelStream()等方法在接口中是默认方法,它们有默认实现,但我们仍然可以重写这些实现。

我们将书籍的自定义集合命名为MyBookContainer,并定义自己的Spliterator:

public class BookSpliterator<T> implements Spliterator<T> {
    private final Object[] books;
    private int startIndex;
    public BookSpliterator(Object[] books, int startIndex) {
        this.books = books;
        this.startIndex = startIndex;
    }
    
    @Override
    public Spliterator<T> trySplit() {
        // 假设源数据太小无法分割,始终返回null
        return null;
    }

    // 其他重写方法如tryAdvance(), estimateSize()等
}

在上述代码中,我们的Spliterator版本接收一个Object数组(本例中为Book)进行分割,但在trySplit()方法中始终返回null。

⚠️ 注意:这种Spliterator接口实现存在缺陷,它不会将数据均匀分区,而是返回null,导致数据不平衡。这里仅用于演示目的。

我们将在自定义集合类MyBookContainer中使用此Spliterator:

public class MyBookContainer<T> implements Collection<T> {
    private static final long serialVersionUID = 1L;
    private T[] elements;

    public MyBookContainer(T[] elements) {
        this.elements = elements;
    }

    @Override
    public Spliterator<T> spliterator() {
        return new BookSpliterator(elements, 0);
    }

    @Override
    public Stream<T> parallelStream() {
        return StreamSupport.stream(spliterator(), false);
    }
    
    // Collection接口的标准重写方法
}

现在尝试在自定义容器类中存储数据并执行并行流操作:

long usingWithCustomSpliterator(MyBookContainer<Book> listOfBooks, int year) {
    AtomicLong countOfBooks = new AtomicLong();
    listOfBooks.parallelStream()
      .forEach(book -> {
          if (book.getYearPublished() == year) {
              countOfBooks.getAndIncrement();
          }
      });
    return countOfBooks.get();
}

本例中的数据源是MyBookContainer类型的实例。这段代码内部使用我们的自定义Spliterator来拆分数据源。结果产生的并行流最多只能是串行流。

我们利用parallelStream()方法返回了一个串行流,尽管其名称暗示是并行流。这正是它与stream().parallel()的区别所在,后者总是尝试返回其提供的流的并行版本。Java在其文档中记录了这种行为:

@implSpec
 * 默认实现从集合的Spliterator创建并行流。
  *
  * @return 此集合上可能并行的流
 * @since 1.8
  */
default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

6. 结论

本文介绍了从集合数据源创建并行流的两种方式。通过实现自定义的Collection和Spliterator,我们揭示了parallelStream()stream().parallel()之间的关键区别。

所有代码示例可在GitHub上找到。


原始标题:Difference Between parallelStream() and stream().parallel() in Java