1. Overview
In this tutorial, we’ll learn about aggregating exceptions in a stream pipeline.
The Stream API in itself does not provide any declarative way to process exceptions. It has a single channel in the pipeline that processes the data, and there is no separate channel for processing exceptions. This means that it does not provide a way to invoke a function when it encounters an exception. Hence we have to fall back to catching exceptions with a try-catch block.
As a result, aggregating exceptions in a stream pipeline and handling them can be challenging.
2. Aggregating Exceptions With a Try Catch Block Within the Stream Pipeline
There are often cases where a method just has to be called for its effect, for example, a simple database update that might throw an exception due to connection failure. With this in mind, let’s consider a simple example of calling processThrowsExAndNoOutput() in the pipeline:
@Test
public void givenTryCatchInPipeline_whenFoundEx_thenSuppressExIntoRuntimeEx() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
RuntimeException runEx = Arrays.stream(strings)
.map(str -> {
try {
processThrowsExAndNoOutput(str);
return null;
} catch (RuntimeException e) {
return e;
}
})
.filter(Objects::nonNull)
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
RuntimeException runtimeException = new RuntimeException("Errors Occurred");
list.forEach(runtimeException::addSuppressed);
return runtimeException;
}));
processExceptions(runEx);
assertEquals("Errors Occurred", runEx.getMessage());
assertEquals(3, runEx.getSuppressed().length);
}
In the above program, we treat caught exceptions as data in the stream. The map() method returns either null or the exception. With filter(), only exceptions are allowed downstream. Finally, we reduce it into a RuntimeException using addSuppressed(). We can then call processExceptions() to handle the aggregate exception.
That works! But could it be more declarative? Let’s work towards it in the upcoming sections.
3. Aggregating Exceptions by Extracting the Try Catch Block Into a Method
Let’s make the implementation a little more readable and concise. To do that, we move the try-catch block into a method:
static Throwable callProcessThrowsExAndNoOutput(String input) {
try {
processThrowsExAndNoOutput(input);
return null;
} catch (RuntimeException e) {
return e;
}
}
Now, we can call the above method from inside the pipeline:
@Test
public void givenExtractedMethod_whenFoundEx_thenSuppressExIntoRuntimeEx() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
RuntimeException runEx = Arrays.stream(strings)
.map(str -> callProcessThrowsExAndNoOutput(str))
.filter(Objects::nonNull)
.reduce(new RuntimeException("Errors Occurred"), (o1, o2) -> {
o1.addSuppressed(o2);
return o1;
});
// handle the aggregate exception as before
}
The above approach looks cleaner. However, there is still room for improvement and more use cases to discuss.
4. Aggregating Exceptions and Output in the Stream Pipeline Using Reflection
Most programs have to handle both exceptions and expected output. Let’s take an example of a method which can return either an exception or some output:
static Object processReturnsExAndOutput(String input) {
try {
return Integer.parseInt(input);
} catch (Exception e) {
return new RuntimeException("Exception in processReturnsExAndOutput for " + input, e);
}
}
Now, let’s look at the pipeline processing:
@Test
public void givenProcessMethod_whenStreamResultHasExAndOutput_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
Map map = strings.stream()
.map(s -> processReturnsExAndOutput(s))
.collect(Collectors.partitioningBy(o -> o instanceof RuntimeException, Collectors.toList()));
List<RuntimeException> exceptions = (List<RuntimeException>) map.getOrDefault(Boolean.TRUE, List.of());
List<Integer> results = (List<Integer>) map.getOrDefault(Boolean.FALSE, List.of());
handleExceptionsAndOutputs(exceptions, results);
}
The above stream pipeline uses partitioningBy() in the terminal collect(). It makes use of reflection to partition the results into a list of exceptions and integers. Further down, the program calls handleExceptionsAndOutputs() to take care of exceptions and the output for further processing.
This time, we didn’t reduce the exceptions into an aggregate RuntimeException. Instead, we passed on the list of exceptions for further processing. This is another way of aggregating the exceptions.
As we can see, it is definitely not the cleanest of the approaches, with raw types and casting required. Hence, the upcoming sections will explore more generalized solutions to address the issue at hand.
4. Aggregating Exceptions and Output Using a Custom Mapper
Going forward, we are going to focus more on functional programming.
We’ll create a custom mapper function that wraps another map() stream function. It returns a Result object which encapsulates both the result and the exception.
First, let’s look at the Result class:
public class Result<R, E extends Throwable> {
private Optional<R> result;
private Optional<E> exception;
public Result(R result) {
this.result = Optional.of(result);
this.exception = Optional.empty();
}
public Result(E exception) {
this.exception = Optional.of(exception);
this.result = Optional.empty();
}
public Optional<R> getResult() {
return result;
}
public Optional<E> getException() {
return exception;
}
}
The Result class uses Generics and Optional. Since result and exception can hold either a null or a non-null value, we have used Optional. Its usage will become clearer as we further move on.
We discussed the custom mapper at the beginning of this section, Let’s now look at its implementation:
public class CustomMapper {
public static <T, R> Function<T, Result<R, Throwable>> mapper(Function<T, R> func) {
return arg -> {
try {
return new Result(func.apply(arg));
} catch (Exception e) {
return new Result(e);
}
};
}
}
Now it is time to see the mapper() in action:
@Test
public void givenCustomMapper_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
strings.stream()
.map(CustomMapper.mapper(Integer::parseInt))
.collect(Collectors.collectingAndThen(Collectors.toList(),
list -> handleErrorsAndOutputForResult(list)));
}
This time we used Collectors.CollectingAndThen() to invoke handleErrorsAndOutputForResult() at the end of the pipeline with the list of Result<Integer, Throwable> objects. Let’s take a look at handleErrorsAndOutputForResult():
static String handleErrorsAndOutputForResult(List<Result<Integer, Throwable>> successAndErrors) {
logger.info("handle errors and output");
successAndErrors.forEach(result -> {
if (result.getException().isPresent()) {
logger.error("Process Exception " + result.getException().get());
} else {
logger.info("Process Result" + result.getResult().get());
}
});
return "Errors and Output Handled";
}
As shown above, we simply iterate over the Result list and fork into a success or failure flow with the help of the method Optional.isPresent(). This can be a useful approach when the success and error cases have to be dealt with distinctly, e.g. sending notifications to separate users.
When the function to be used inside Stream.map() cannot be modified, for example, because it is from an external library, we can use our custom mapper() function to wrap it and handle the outcome in a more generalized manner.
4. Aggregate Exceptions and Output Using a Custom Collector
Aggregating the exceptions and output of a pipeline is a kind of collection activity. Hence it makes sense to implement a collector, which is designed for this purpose.
Let’s see how to do that:
public class CustomCollector<T, R> {
private final List<R> results = new ArrayList<>();
private final List<Throwable> exceptions = new ArrayList<>();
public static <T, R> Collector<T, ?, CustomCollector<T, R>> of(Function<T, R> mapper) {
return Collector.of(
CustomCollector::new,
(collector, item) -> {
try {
R result = mapper.apply(item);
collector.results.add(result);
} catch (Exception e) {
collector.exceptions.add(e);
}
},
(left, right) -> {
left.results.addAll(right.results);
left.exceptions.addAll(right.exceptions);
return left;
}
);
}
// standard getters...
}
Finally, let’s take a look at how the collector exactly works:
@Test
public void givenCustomCollector_whenStreamResultHasExAndSuccess_thenHandleAggrExceptionAndResults() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
Arrays.stream(strings)
.collect(Collectors.collectingAndThen(CustomCollector.of(Integer::parseInt),
col -> handleExAndResults(col.getExceptions(), col.getResults())));
}
5. Aggregating Exceptions and Output Using Try and Either From Vavr Library
Try is a container that holds either an uncaught exception or the actual output in case of success. Just like the custom mapper discussed earlier, Try can also wrap functions.
Whereas, Either is a more general container that holds either an error type or the expected output type.
Let’s see how we can exploit these features together:
@Test
public void givenVavrEitherAndTry_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
strings.stream()
.map(str -> Try.of(() -> Integer.parseInt(str)).toEither())
.collect(Collectors.collectingAndThen(Collectors.partitioningBy(Either::isLeft, Collectors.toList()),
map -> handleErrorsAndOutputForEither(map)));
}
As we can see, the program converts the Try object into an Either and then collects it into a map to invoke handleErrorsAndOutputForEither():
static void handleErrorsAndOutputForEither(Map<Boolean, List<Either<Throwable, Integer>>> map) {
logger.info("handle errors and output");
map.getOrDefault(Boolean.TRUE, List.of())
.forEach(either -> logger.error("Process Exception " + either.getLeft()));
map.getOrDefault(Boolean.FALSE, List.of())
.forEach(either -> logger.info("Process Result " + either.get()));
}
Further, as shown above, the exceptions and output can be processed by swiping left or right on the Either object. As we can see, the Try and Either approach provides us with the most concise solution that we have seen today.
6. Conclusion
In this tutorial, we explored a few ways to aggregate runtime exceptions while processing a stream. While many approaches are possible, it is important to maintain the essence of stream processing, including conciseness, immutability, and declarative syntax.
As usual, the code is available over on GitHub.