1. 概述

当我们在分布式云环境中构建应用程序时,我们需要针对故障进行设计。这通常涉及重试。

Spring WebFlux 为我们提供了一些重试失败操作的工具。

在本教程中,我们将了解如何向 Spring WebFlux 应用程序添加和配置重试。

2. 使用案例

对于我们的示例,我们将使用 MockWebServer 并模拟外部系统暂时不可用然后变得可用。

让我们为连接到此 REST 服务的组件创建一个简单的测试:

@Test
void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() {

    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setResponseCode(SERVICE_UNAVAILABLE.code()));
    mockExternalService.enqueue(new MockResponse()
      .setBody("stock data"));

    StepVerifier.create(externalConnector.getData("ABC"))
      .expectNextMatches(response -> response.equals("stock data"))
      .verifyComplete();

    verifyNumberOfGetRequests(4);
}

3.添加重试

MonoFlux API 中内置了两个关键的重试运算符。

3.1.使用 重试

首先,我们使用 retry 方法,它可以防止应用程序立即返回错误并重新订阅指定的次数:

public Mono<String> getData(String stockId) {
    return webClient.get()
        .uri(PATH_BY_ID, stockId)
        .retrieve()
        .bodyToMono(String.class)
        .retry(3);
}

无论 Web 客户端返回什么错误,这都会重试最多 3 次。

3.2.使用 重试时间

接下来,让我们尝试使用 retryWhen 方法的可配置策略:

public Mono<String> getData(String stockId) {
    return webClient.get()
        .uri(PATH_BY_ID, stockId)
        .retrieve()
        .bodyToMono(String.class)
        .retryWhen(Retry.max(3));
}

这允许我们配置一个 Retry 对象来描述所需的逻辑。

在这里,我们使用 最大 策略来重试最多尝试次数。这相当于我们的第一个示例,但允许我们更多的配置选项。特别要注意的是,在这种情况下, 每次重试都会尽可能快地发生

4. 添加延迟

立即重试的主要缺点是,这不会给失败的服务提供恢复时间。它可能会压垮它,使问题变得更糟并减少恢复的机会。

4.1.使用 固定延迟 重试

我们可以使用 fixedDelay 策略在每次尝试之间添加延迟:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}

此配置允许尝试之间有两秒的延迟,这可能会增加成功的机会。但是,如果服务器中断时间较长,那么我们应该等待更长时间。但是,如果我们将所有延迟配置为很长一段时间,那么短暂的信号会进一步减慢我们的服务速度。

4.2.回 退重

我们可以使用 退避 策略,而不是按固定间隔重试:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}

实际上,这 会逐渐增加尝试之间的延迟 - 在我们的示例中大约为 2 秒、4 秒,然后是 8 秒。这 使外部系统有更好的机会从常见的连接问题中恢复 或处理积压的工作。

4.3.重试时出现 抖动

退避 策略的另一个好处是它为计算的延迟间隔增加了随机性或抖动。因此, 抖动可以帮助减少多个客户端同步重试的重试风暴

默认情况下,该值设置为 0.5,这对应于计算延迟的最多 50% 的抖动。

让我们使用 jitter 方法配置不同的值 0.75,以表示最多 75% 的计算延迟的抖动:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .accept(MediaType.APPLICATION_JSON)
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}

我们应该注意,可能的值范围在 0(无抖动)和 1(抖动最多为计算延迟的 100%)之间。

5. 过滤错误

此时,服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如 400:Bad Request401:Unauthorized

显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何 仅在出现特定错误的情况下应用重试策略

首先,我们创建一个异常来表示服务器错误:

public class ServiceException extends RuntimeException {
    
    public ServiceException(String message, int statusCode) {
        super(message);
        this.statusCode = statusCode;
    }
}

接下来,我们将创建一个错误 Mono, 其中包含 5xx 错误的异常,并使用 过滤器 方法来配置我们的策略:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .onStatus(HttpStatus::is5xxServerError, 
          response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
          .filter(throwable -> throwable instanceof ServiceException));
}

现在,我们仅在 WebClient 管道中抛出 ServiceException 时重试。

6. 处理耗尽重试

最后,我们可以解释所有重试尝试均不成功的可能性。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。

相反,让我们使用 onRetryExhaustedThrow 方法覆盖此行为,并为 ServiceException 提供生成器:

public Mono<String> getData(String stockId) {
    return webClient.get()
      .uri(PATH_BY_ID, stockId)
      .retrieve()
      .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
          .filter(throwable -> throwable instanceof ServiceException)
          .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
              throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());
          }));
}

现在,在一系列失败的重试结束后,请求将失败并出现 ServiceException

七、结论

在本文中,我们了解了如何使用 retryretryWhen 方法在 Spring WebFlux 应用程序中添加重试。

最初,我们为失败的操作添加了最大重试次数。然后,我们通过使用和配置各种策略在尝试之间引入延迟。

最后,我们研究了重试某些错误以及在所有尝试都用尽后自定义行为。

与往常一样,完整的源代码可以在 GitHub 上获取。