1. Overview

In this article, we will be looking at a java.util.Stream API and we’ll see how we can use that construct to operate on an infinite stream of data/elements.

The possibility of working on the infinite sequence of elements is predicated on the fact that streams are built to be lazy.

This laziness is achieved by a separation between two types of the operations that could be executed on streams: intermediate and terminal operations.

2. Intermediate and Terminal Operations

All Stream operations are divided into intermediate and terminal operations and are combined to form stream pipelines.

A stream pipeline consists of a source (such as a Collection, an array, a generator function, an I/O channel, or infinite sequence generator); followed by zero or more intermediate operations and a terminal operation.

2.1. Intermediate Operations

Intermediate operations are not executed until some terminal operation is invoked.

They’re composed forming a pipeline of a Stream execution. The intermediate operation can be added to a Stream pipeline by methods:

  • filter()
  • map()
  • flatMap()
  • distinct()
  • sorted()
  • peek()
  • limit()
  • skip()

All Intermediate operations are lazy, so they’re not executed until a result of a processing is actually needed.

Basically, intermediate operations return a new stream. Executing an intermediate operation does not actually perform any operation, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate.

As such, traversal of the Stream doesn’t begin until the terminal operation of the pipeline is executed.

That is very important property, specifically important for infinite streams – because it allows us to create streams that will be actually invoked only when a Terminal operation is called.

2.2. Terminal Operations

Terminal operations may traverse the stream to produce a result or a side effect.

After the terminal operation is performed, the stream pipeline is considered consumed, and can no longer be used. In almost all cases, terminal operations are eager, completing their traversal of the data source and processing of the pipeline before returning.

The eagerness of a terminal operation is important concerning infinite streams because at the moment of processing we need to think carefully if our Stream is properly bounded by, for example, a limit() transformation. Terminal operations are:

  • forEach()
  • forEachOrdered()
  • toArray()
  • reduce()
  • collect()
  • min()
  • max()
  • count()
  • anyMatch()
  • allMatch()
  • noneMatch()
  • findFirst()
  • findAny()

Each of these operations will trigger execution of all intermediate operations.

3. Infinite Streams

Now that we understand these two concepts – Intermediate and Terminal operations – we’re able to write an infinite stream that leverage laziness of Streams.

Let’s say that we want to create an infinite stream of elements from zero that will be incremented by two. Then we need to limit that sequence before calling terminal operation.

It is crucial to use a limit() method before executing a collect() method that is a terminal operation, otherwise our program will run indefinitely:

// given
Stream<Integer> infiniteStream = Stream.iterate(0, i -> i + 2);

// when
List<Integer> collect = infiniteStream

// then
assertEquals(collect, Arrays.asList(0, 2, 4, 6, 8, 10, 12, 14, 16, 18));

We created an infinite stream using an iterate() method. Then we called a limit() transformation and a collect() terminal operation. Then in our resulting List, we will have first 10 elements of an infinite sequence due to a laziness of a Stream.

4. Infinite Stream of a Custom Type of Elements

Let’s say that we want to create an infinite stream of random UUIDs.

The first step to achieving this using Stream API is to create a Supplier of those random values:

Supplier<UUID> randomUUIDSupplier = UUID::randomUUID;

When we define a supplier we can create an infinite stream using a generate() method:

Stream<UUID> infiniteStreamOfRandomUUID = Stream.generate(randomUUIDSupplier);

Then we could take a couple of elements from that stream. We need to remember to use a limit() method if we want our program to finish in a finite time:

List<UUID> randomInts = infiniteStreamOfRandomUUID

We use a skip() transformation to discard first 10 results and take the next 10 elements. We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.

6. Do-While – the Stream Way

Let’s say that we have a simple do..while loop in our code:

int i = 0;
while (i < 10) {

We are printing i counter ten times. We can expect that such construct can be easily written using Stream API and ideally, we would have a doWhile() method on a stream.

Unfortunately, there is no such method on a stream and when we want to achieve functionality similar to standard do-while loop we need to use a limit() method:

Stream<Integer> integers = Stream
  .iterate(0, i -> i + 1);

We achieved same functionality like an imperative while loop with less code, but call to the limit() function is not as descriptive as it would be if we had a doWhile() method on a Stream object.

5. Conclusion

This article explains how we can use the Stream API to create infinite streams. These, when used together with transformations such as limit() – can make some scenarios quite a bit easier to understand and implement.

The code supporting of all these examples can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

« 上一篇: 如何测试RxJava?