1. 概述
当我们在分布式云环境中构建应用程序时,我们需要针对故障进行设计。这通常涉及重试。
Spring WebFlux 为我们提供了一些重试失败操作的工具。
在本教程中,我们将了解如何向 Spring WebFlux 应用程序添加和配置重试。
2. 使用案例
对于我们的示例,我们将使用 MockWebServer 并模拟外部系统暂时不可用然后变得可用。
让我们为连接到此 REST 服务的组件创建一个简单的测试:
@Test
void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() {
mockExternalService.enqueue(new MockResponse()
.setResponseCode(SERVICE_UNAVAILABLE.code()));
mockExternalService.enqueue(new MockResponse()
.setResponseCode(SERVICE_UNAVAILABLE.code()));
mockExternalService.enqueue(new MockResponse()
.setResponseCode(SERVICE_UNAVAILABLE.code()));
mockExternalService.enqueue(new MockResponse()
.setBody("stock data"));
StepVerifier.create(externalConnector.getData("ABC"))
.expectNextMatches(response -> response.equals("stock data"))
.verifyComplete();
verifyNumberOfGetRequests(4);
}
3.添加重试
Mono 和 Flux API 中内置了两个关键的重试运算符。
3.1.使用 重试
首先,我们使用 retry 方法,它可以防止应用程序立即返回错误并重新订阅指定的次数:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.bodyToMono(String.class)
.retry(3);
}
无论 Web 客户端返回什么错误,这都会重试最多 3 次。
3.2.使用 重试时间
接下来,让我们尝试使用 retryWhen 方法的可配置策略:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.max(3));
}
这允许我们配置一个 Retry 对象来描述所需的逻辑。
在这里,我们使用 最大 策略来重试最多尝试次数。这相当于我们的第一个示例,但允许我们更多的配置选项。特别要注意的是,在这种情况下, 每次重试都会尽可能快地发生 。
4. 添加延迟
立即重试的主要缺点是,这不会给失败的服务提供恢复时间。它可能会压垮它,使问题变得更糟并减少恢复的机会。
4.1.使用 固定延迟 重试
我们可以使用 fixedDelay 策略在每次尝试之间添加延迟:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)));
}
此配置允许尝试之间有两秒的延迟,这可能会增加成功的机会。但是,如果服务器中断时间较长,那么我们应该等待更长时间。但是,如果我们将所有延迟配置为很长一段时间,那么短暂的信号会进一步减慢我们的服务速度。
4.2.回 退重 试
我们可以使用 退避 策略,而不是按固定间隔重试:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
}
实际上,这 会逐渐增加尝试之间的延迟 - 在我们的示例中大约为 2 秒、4 秒,然后是 8 秒。这 使外部系统有更好的机会从常见的连接问题中恢复 或处理积压的工作。
4.3.重试时出现 抖动
退避 策略的另一个好处是它为计算的延迟间隔增加了随机性或抖动。因此, 抖动可以帮助减少多个客户端同步重试的重试风暴 。
默认情况下,该值设置为 0.5,这对应于计算延迟的最多 50% 的抖动。
让我们使用 jitter 方法配置不同的值 0.75,以表示最多 75% 的计算延迟的抖动:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).jitter(0.75));
}
我们应该注意,可能的值范围在 0(无抖动)和 1(抖动最多为计算延迟的 100%)之间。
5. 过滤错误
此时,服务中的任何错误都将导致重试尝试,包括 4xx 错误,例如 400:Bad Request 或 401:Unauthorized 。
显然,我们不应该重试此类客户端错误,因为服务器响应不会有任何不同。因此,让我们看看如何 仅在出现特定错误的情况下应用重试策略 。
首先,我们创建一个异常来表示服务器错误:
public class ServiceException extends RuntimeException {
public ServiceException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}
}
接下来,我们将创建一个错误 Mono, 其中包含 5xx 错误的异常,并使用 过滤器 方法来配置我们的策略:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.onStatus(HttpStatus::is5xxServerError,
response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
.filter(throwable -> throwable instanceof ServiceException));
}
现在,我们仅在 WebClient 管道中抛出 ServiceException 时重试。
6. 处理耗尽重试
最后,我们可以解释所有重试尝试均不成功的可能性。在这种情况下,该策略的默认行为是传播 RetryExhaustedException ,包装最后一个错误。
相反,让我们使用 onRetryExhaustedThrow 方法覆盖此行为,并为 ServiceException 提供生成器:
public Mono<String> getData(String stockId) {
return webClient.get()
.uri(PATH_BY_ID, stockId)
.retrieve()
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode())))
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
.filter(throwable -> throwable instanceof ServiceException)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value());
}));
}
现在,在一系列失败的重试结束后,请求将失败并出现 ServiceException 。
七、结论
在本文中,我们了解了如何使用 retry 和 retryWhen 方法在 Spring WebFlux 应用程序中添加重试。
最初,我们为失败的操作添加了最大重试次数。然后,我们通过使用和配置各种策略在尝试之间引入延迟。
最后,我们研究了重试某些错误以及在所有尝试都用尽后自定义行为。
与往常一样,完整的源代码可以在 GitHub 上获取。