1. Introduction
In this tutorial, we’re going to explore Collections.parallelStream() and stream().parallel() offerings of Java Streams API. Java introduced the parallelStream() method to the Collection interface and the parallel() method to the BaseStream interface with Java 8.
2. Parallel Stream and Parallelism in Java
A parallel stream allows us to use multi-core processing by executing the stream operation parallel across multiple CPU cores. The data is split into multiple sub-streams, and the intended operation is performed in parallel, and finally, the results are aggregated back to form the final output.
A stream created in Java is always serial in nature by default unless specified otherwise. We can convert the stream to a parallel stream in two ways:
- we can invoke Collections.parallelStream()
- we can invoke BaseStream.parallel()
If not specified by the stream operation, the Java compiler and runtime decide the sequence of processing for optimal parallel computing benefits when a parallel stream operation is executed.
For example, we are provided with a very long list of Book objects. We have to determine the count of books that were published in a specified year:
public class Book {
private String name;
private String author;
private int yearPublished;
// getters and setters
}
We can leverage parallel streams here and find the count more efficiently than doing it serially. The order of execution in our example does not impact the final result in any way, making it a perfect candidate for parallel Stream operation.
3. Usage of Collections.parallelStream()
One of the ways we can use parallel streams in our application is by invoking parallelStream() on the data source. This operation returns a possibly parallel Stream with the collection offered as its source. We can apply this to our example and find the count of books published in a specific year:
long usingCollectionsParallel(Collection<Book> listOfbooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfbooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
The default implementation of the parallelStream() method creates a parallel Stream from the Collection‘s Spliterator
The Spliterator API, similar to an Iterator, allows for the traversal of elements of its source and was designed to support efficient parallel traversal. The Collection‘s default Spliterator will be used in parallelStream() invocation.
4. Usage of parallel() on a Stream
We can achieve the same result by first converting the collection to a Stream. We can convert the sequential stream generated as a result into a parallel Stream by calling parallel() on it. Once we have a parallel Stream, we can find our result in the same way we have done above:
long usingStreamParallel(Collection<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.stream().parallel()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
The Streams API’s BaseStream interface will split the underlying data as much as the source collection’s default Spliterator will allow and then use the Fork-Join framework to convert it into a parallel Stream.
The result of both approaches bears the same result.
5. Difference in parallelStream() and stream().parallel()
Collections.parallelStream() uses the source collection’s default Spliterator to split the data source to enable parallel execution. Splitting the data source evenly is important to enabling correct parallel execution. An unevenly split data source does more harm in parallel execution than its sequential counterpart.
We have been using List
We should remember that overriding an interface means we would have to provide our implementation of the abstract methods defined in the base interface. However, we see that methods such as spliterator(), stream(), and parallelStream() are present as default methods in the interface. These methods have an implementation provided by default. However, we can override those implementations with our versions nonetheless.
We’ll call our custom Collection for Books, MyBookContainer and we’ll define our own Spliterator as well:
public class BookSpliterator<T> implements Spliterator<T> {
private final Object[] books;
private int startIndex;
public BookSpliterator(Object[] books, int startIndex) {
this.books = books;
this.startIndex = startIndex;
}
@Override
public Spliterator<T> trySplit() {
// Always Assuming that the source is too small to split, returning null
return null;
}
// Other overridden methods such as tryAdvance(), estimateSize() etc
}
In the above code snippet, we see that our version of the Spliterator takes in an array of Objects(in our case, Book) to split, and in the trySplit() method, it always returns null.
We should note that this implementation of the Spliterator
We will use this Spliterator in our custom Collection class MyBookContainer:
public class MyBookContainer<T> implements Collection<T> {
private static final long serialVersionUID = 1L;
private T[] elements;
public MyBookContainer(T[] elements) {
this.elements = elements;
}
@Override
public Spliterator<T> spliterator() {
return new BookSpliterator(elements, 0);
}
@Override
public Stream<T> parallelStream() {
return StreamSupport.stream(spliterator(), false);
}
// standard overridden methods of Collection Interface
}
We’ll try to store data in our custom container class and perform parallel Stream operations on it:
long usingWithCustomSpliterator(MyBookContainer<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
The data source in this example is an instance of the MyBookContainer type. This code internally uses our custom Spliterator to split the data source. The resultant parallel Stream will be a sequential stream at best.
We just exploited the parallelStream() method to return a sequential stream, even though the name suggests parallelStream. This is where the method differs from stream().parallel(), which always tries to return a parallel version of the stream provided to it. Java has documented this behavior in its documentation as follows:
@implSpec
* The default implementation creates a parallel {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
6. Conclusion
In this article, we covered the ways we can create parallel Streams from a Collection data source. We also tried to find out the difference between parallelStream() and stream().parallel() by implementing our custom versions of a Collection and Spliterator.
As usual, all code samples can be found over on GitHub.