1. 概述

本文将探讨如何在Stream管道中聚合异常处理。Stream API本身没有提供声明式的异常处理机制——它只有单通道处理数据,没有独立的异常处理通道。这意味着当遇到异常时,无法直接调用处理函数,我们只能退回到传统的try-catch块。

因此,在Stream管道中聚合和处理异常会变得棘手。本文将介绍几种实用的解决方案。

2. 在Stream管道内使用try-catch块聚合异常

常见场景:需要调用仅产生副作用的方法(如数据库更新),可能因连接失败抛出异常。以下示例调用processThrowsExAndNoOutput()处理字符串数组:

@Test
public void givenTryCatchInPipeline_whenFoundEx_thenSuppressExIntoRuntimeEx() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    RuntimeException runEx = Arrays.stream(strings)
      .map(str -> {
          try {
              processThrowsExAndNoOutput(str);
              return null;
          } catch (RuntimeException e) {
              return e;
          }
      })
      .filter(Objects::nonNull)
      .collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
          RuntimeException runtimeException = new RuntimeException("Errors Occurred");
          list.forEach(runtimeException::addSuppressed);
          return runtimeException;
      }));
    processExceptions(runEx);
    assertEquals("Errors Occurred", runEx.getMessage());
    assertEquals(3, runEx.getSuppressed().length);
}

核心思路:将捕获的异常作为流数据map()返回null或异常对象,filter()只保留异常,最后通过addSuppressed()聚合为RuntimeException。虽然可行,但代码不够声明式。

3. 通过提取try-catch块到方法中聚合异常

优化方案:将try-catch块提取为独立方法提升可读性:

static Throwable callProcessThrowsExAndNoOutput(String input) {
    try {
        processThrowsExAndNoOutput(input);
        return null;
    } catch (RuntimeException e) {
        return e;
    }
}

简化后的管道调用:

@Test
public void givenExtractedMethod_whenFoundEx_thenSuppressExIntoRuntimeEx() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    RuntimeException runEx = Arrays.stream(strings)
      .map(str -> callProcessThrowsExAndNoOutput(str))
      .filter(Objects::nonNull)
      .reduce(new RuntimeException("Errors Occurred"), (o1, o2) -> {
          o1.addSuppressed(o2);
          return o1;
      });
    // 处理聚合异常(同上)
}

代码更简洁,但仍有优化空间。

4. 使用反射在Stream管道中聚合异常和输出

实际开发常需同时处理异常和正常输出。考虑以下方法:

static Object processReturnsExAndOutput(String input) {
    try {
        return Integer.parseInt(input);
    } catch (Exception e) {
        return new RuntimeException("Exception in processReturnsExAndOutput for " + input, e);
    }
}

管道处理方案:

@Test
public void givenProcessMethod_whenStreamResultHasExAndOutput_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c");
    Map map = strings.stream()
      .map(s -> processReturnsExAndOutput(s))
      .collect(Collectors.partitioningBy(o -> o instanceof RuntimeException, Collectors.toList()));
    
    List<RuntimeException> exceptions = (List<RuntimeException>) map.getOrDefault(Boolean.TRUE, List.of());
    List<Integer> results = (List<Integer>) map.getOrDefault(Boolean.FALSE, List.of());
    handleExceptionsAndOutputs(exceptions, results);
}

关键点:使用partitioningBy()通过反射分区结果。未聚合为单个异常,而是传递异常列表供后续处理。缺点是需要原始类型转换,不够优雅。

5. 使用自定义Mapper聚合异常和输出

转向函数式编程方案:创建自定义Mapper封装map()函数,返回包含结果和异常的Result对象。

Result类设计:

public class Result<R, E extends Throwable> {
    private Optional<R> result;
    private Optional<E> exception;

    public Result(R result) {
        this.result = Optional.of(result);
        this.exception = Optional.empty();
    }

    public Result(E exception) {
        this.exception = Optional.of(exception);
        this.result = Optional.empty();
    }

    public Optional<R> getResult() {
        return result;
    }

    public Optional<E> getException() {
        return exception;
    }
}

自定义Mapper实现:

public class CustomMapper {
    public static <T, R> Function<T, Result<R, Throwable>> mapper(Function<T, R> func) {
        return arg -> {
            try {
                return new Result(func.apply(arg));
            } catch (Exception e) {
                return new Result(e);
            }
        };
    }
}

使用示例:

@Test
public void givenCustomMapper_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c");
    strings.stream()
      .map(CustomMapper.mapper(Integer::parseInt))
      .collect(Collectors.collectingAndThen(Collectors.toList(),
        list -> handleErrorsAndOutputForResult(list)));
}

处理函数:

static String handleErrorsAndOutputForResult(List<Result<Integer, Throwable>> successAndErrors) {
    logger.info("handle errors and output");
    successAndErrors.forEach(result -> {
        if (result.getException().isPresent()) {
            logger.error("Process Exception " + result.getException().get());
        } else {
            logger.info("Process Result" + result.getResult().get());
        }
    });
    return "Errors and Output Handled";
}

优势:当Stream.map()中的函数不可修改(如外部库)时,可用此方案统一封装处理。

6. 使用自定义Collector聚合异常和输出

聚合本质是收集操作,适合用自定义Collector实现:

public class CustomCollector<T, R> {
    private final List<R> results = new ArrayList<>();
    private final List<Throwable> exceptions = new ArrayList<>();

    public static <T, R> Collector<T, ?, CustomCollector<T, R>> of(Function<T, R> mapper) {
        return Collector.of(
          CustomCollector::new,
          (collector, item) -> {
              try {
                  R result = mapper.apply(item);
                  collector.results.add(result);
              } catch (Exception e) {
                  collector.exceptions.add(e);
              }
          },
          (left, right) -> {
              left.results.addAll(right.results);
              left.exceptions.addAll(right.exceptions);
              return left;
          }
        );
    }
    // 标准getter方法...
}

使用示例:

@Test
public void givenCustomCollector_whenStreamResultHasExAndSuccess_thenHandleAggrExceptionAndResults() {
    String[] strings = {"1", "2", "3", "a", "b", "c"};
    Arrays.stream(strings)
      .collect(Collectors.collectingAndThen(CustomCollector.of(Integer::parseInt),
        col -> handleExAndResults(col.getExceptions(), col.getResults())));
}

7. 使用Vavr库的Try和Either聚合异常和输出

Vavr的Try容器封装成功结果或未捕获异常,类似自定义Mapper。**Either是更通用的容器,可持有错误类型或成功类型**。

组合使用方案:

@Test
public void givenVavrEitherAndTry_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
    List<String> strings = List.of("1", "2", "3", "a", "b", "c"};
    strings.stream()
      .map(str -> Try.of(() -> Integer.parseInt(str)).toEither())
      .collect(Collectors.collectingAndThen(Collectors.partitioningBy(Either::isLeft, Collectors.toList()),
        map -> handleErrorsAndOutputForEither(map)));
}

处理函数:

static void handleErrorsAndOutputForEither(Map<Boolean, List<Either<Throwable, Integer>>> map) {
    logger.info("handle errors and output");
    map.getOrDefault(Boolean.TRUE, List.of())
      .forEach(either -> logger.error("Process Exception " + either.getLeft()));
    map.getOrDefault(Boolean.FALSE, List.of())
      .forEach(either -> logger.info("Process Result " + either.get()));
}

优势:通过Either的左右分支处理异常和结果,代码最简洁。

8. 总结

本文探讨了Java Stream中聚合运行时异常的多种方案。选择方案时需权衡:

  • ✅ 基础方案(try-catch)简单但不够声明式
  • ✅ 自定义Mapper/Collector提升可读性
  • ✅ Vavr库提供最简洁的函数式方案

核心原则:保持Stream处理的本质特性——简洁性、不可变性和声明式语法。完整代码示例可在GitHub查看(联系邮箱:dev@example.com)。


原始标题:Aggregate Runtime Exceptions in Java Streams