1. Overview

In Reactive Programming, it’s often necessary to transform Collections into a reactive stream known as a Flux. This becomes a crucial step when integrating an existing data structure into the reactive pipeline.

In this tutorial, we’ll explore how to transform a Collection of elements into a Flux of elements.

2. Problem Definition

The two main types of *Publisher*s in Project Reactor are Flux and Mono. Mono can emit at most one value, and Flux can emit an arbitrary number of values.

When we fetch a List, we can either wrap it in a Mono<List> or convert it to a Flux. A blocking call that returns List can be wrapped in a Mono, emitting the entire list in one big emit.

However, if we put such a large list in a Flux, it allows the Subscriber to request data in manageable chunks. This enables the subscriber to process items one by one or in small batches:

We’ll be exploring different approaches for converting a List that already holds elements of type T. For our use case, we’ll be considering the operators fromIterable and create of Publisher type Flux to transform a List to a Flux.

3. fromIterable

Let’s first create a List of type Integer and add some values to it:

List<Integer> list = List.of(1, 2, 3);

fromIterable is an operator on the Flux Publisher that emits the items contained in the provided collection.

We’ve used the log() operator to log each element published:

private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
    return Flux
      .fromIterable(list)
      .log();
}

Then we can apply the fromIterable operator to our Integer List and observe the behavior:

@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){

    List<Integer> list = List.of(1, 2, 3);
    Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);

    StepVerifier.create(flux)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectComplete()
      .verify();
}

Finally, we used the StepVerifier API to verify the emitted elements from the Flux with the elements that were in the List. After we wrap up our Flux source, which is under test, we use the expectNext method to cross-reference whether the items emitted from the Flux and items inside the List are identical and follow the same order.

4. create

The create operator on the Flux type Publisher enables us to create a Flux using the FluxSink API programmatically.

While the fromIterable is generally a good option for most cases, it’s not straightforward to use when the list is generated by a callback. In such cases, using the create operator is more suitable.

Let’s create an interface for a callback:

public interface Callback<T>  {
    void onTrigger(T element);
}

Next, let’s imagine a List that’s returned from an asynchronous API call:

private void asynchronousApiCall(Callback<List<Integer>> callback) {
    Thread thread = new Thread(()-> {
      List<Integer> list = List.of(1, 2,3);
      callback.onTrigger(list);
    });
    thread.start();
}

Now, instead of fromIterable, let’s use FluxSink inside of the callback to add each of those items to the list:

@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {

    Flux<Integer> flux = Flux.create(sink -> {
      Callback<List<Integer>> callback = list -> {
        list.forEach(sink::next);
        sink.complete();
      };
      asynchronousApiCall(callback);
    });

    StepVerifier.create(flux)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectComplete()
      .verify();
}

5. Conclusion

In this article, we explored different methods to transform a List into a Flux using the operators fromIterable and create in Publisher type Flux. The fromIterable operator can be used with type List as well as with List wrapped in a Mono.  The create operator is best suited for List created from a callback.

As always, the full source code is available over on GitHub.