1. Overview
In Reactive Programming, it’s often necessary to transform Collections into a reactive stream known as a Flux. This becomes a crucial step when integrating an existing data structure into the reactive pipeline.
In this tutorial, we’ll explore how to transform a Collection of elements into a Flux of elements.
2. Problem Definition
The two main types of *Publisher*s in Project Reactor are Flux and Mono. Mono can emit at most one value, and Flux can emit an arbitrary number of values.
When we fetch a List
However, if we put such a large list in a Flux, it allows the Subscriber to request data in manageable chunks. This enables the subscriber to process items one by one or in small batches:
We’ll be exploring different approaches for converting a List that already holds elements of type T. For our use case, we’ll be considering the operators fromIterable and create of Publisher type Flux to transform a List
3. fromIterable
Let’s first create a List of type Integer and add some values to it:
List<Integer> list = List.of(1, 2, 3);
fromIterable is an operator on the Flux Publisher that emits the items contained in the provided collection.
We’ve used the log() operator to log each element published:
private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
return Flux
.fromIterable(list)
.log();
}
Then we can apply the fromIterable operator to our Integer List and observe the behavior:
@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){
List<Integer> list = List.of(1, 2, 3);
Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
Finally, we used the StepVerifier API to verify the emitted elements from the Flux with the elements that were in the List. After we wrap up our Flux source, which is under test, we use the expectNext method to cross-reference whether the items emitted from the Flux and items inside the List are identical and follow the same order.
4. create
The create operator on the Flux type Publisher enables us to create a Flux using the FluxSink API programmatically.
While the fromIterable is generally a good option for most cases, it’s not straightforward to use when the list is generated by a callback. In such cases, using the create operator is more suitable.
Let’s create an interface for a callback:
public interface Callback<T> {
void onTrigger(T element);
}
Next, let’s imagine a List
private void asynchronousApiCall(Callback<List<Integer>> callback) {
Thread thread = new Thread(()-> {
List<Integer> list = List.of(1, 2,3);
callback.onTrigger(list);
});
thread.start();
}
Now, instead of fromIterable, let’s use FluxSink inside of the callback to add each of those items to the list:
@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {
Flux<Integer> flux = Flux.create(sink -> {
Callback<List<Integer>> callback = list -> {
list.forEach(sink::next);
sink.complete();
};
asynchronousApiCall(callback);
});
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
5. Conclusion
In this article, we explored different methods to transform a List
As always, the full source code is available over on GitHub.