1. 概述
在消息系统中,消息确认是一种标准机制,它向消息代理发送信号,表明消息已接收,不应再进行重发。在亚马逊的简单队列服务(SQS)中,确认是通过删除队列中的消息来执行的。
在这个教程中,我们将探讨Spring Cloud AWS SQS v3提供的三种预设确认模式:ON_SUCCESS
、MANUAL
和ALWAYS
。我们将使用事件驱动的场景来展示这些用例,并利用《Spring Cloud AWS SQS V3入门》文章中的环境和测试设置。
2. 依赖项
首先,我们需要导入Spring Cloud AWS Bill of Materials,确保pom.xml
中的所有依赖项兼容:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
接着,添加核心和SQS启动器依赖:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
最后,为了测试,我们需要添加LocalStack和TestContainers的依赖,以及JUnit 5、awaitility库用于验证异步消息消费,以及AssertJ处理断言:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3. 配置本地测试环境
首先,我们将使用Testcontainers配置一个LocalStack环境,用于本地测试:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
尽管这种设置让测试变得简单且可重复,但请注意,本教程中的代码也可以直接针对AWS运行。
4. 设置队列名称
5. 成功处理后的确认
注意,测试完成后可能会出现“连接拒绝”错误——这是因为Docker容器在框架停止轮询消息之前就停止了。我们可以安全地忽略这些错误。
6. 手动确认
框架支持手动确认消息,这对于需要对确认过程有更多控制的场景非常有用。
6.1. 创建监听器
为了演示这一点,我们将创建一个异步场景,其中InventoryService
有一个慢速连接,我们希望在完成前释放监听线程:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
.thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
.thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
.thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
logger.info("Releasing processing thread.");
}
在这个逻辑中,我们使用Java的CompletableFuture
异步运行库存检查。我们在监听方法中添加了一个Acknowledge
对象,并在注解的acknowledgementMode
属性中设置了SqsListenerAcknowledgementMode.MANUAL
。这个属性是一个String
,接受属性占位符和SpEL。当我们将AcknowledgementMode
设置为MANUAL
时,才会提供Acknowledgement
对象。
请注意,此示例利用了Spring Boot的自动配置,提供了合理的默认值,以及@SqsListener
注解属性来切换确认模式。另一种选择是声明一个SqsMessageListenerContainerFactory
bean,以便设置更复杂的配置。
6.2. 模拟慢速连接
现在,让我们在InventoryService
类中添加slowCheckInventory
方法,使用Thread.sleep
模拟慢速连接:
public void slowCheckInventory(UUID productId, int quantity) {
simulateBusyConnection();
checkInventory(productId, quantity);
}
private void simulateBusyConnection() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
6.3. 测试
接下来,编写我们的测试:
@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "async-order-processing-container");
}
这次,我们请求的是库存中有可用数量的产品,所以不应抛出任何错误。
运行测试时,我们会看到收到消息的日志信息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
然后,我们会看到线程释放消息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.
这是因为我们异步处理并确认了消息。大约两秒后,我们应该看到消息已被确认的日志:
INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
最后,我们会看到停止容器和断言队列为空的日志:
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue
7. 无论成功或错误都确认
我们将探讨的最后一种确认模式是ALWAYS
,它会导致框架无论监听方法是否抛出异常都会确认消息。
7.1. 创建监听器
让我们模拟一次销售事件,库存有限,我们不希望重新处理任何消息,不论出现何种失败。我们将在application.yml
中早先定义的属性中设置确认模式为ALWAYS
:
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
logger.info("Message processed: {}", orderCreatedEvent);
}
在测试中,我们将创建一个超出库存数量的订单:
7.2. 测试
@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.RECEIVED));
assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}
现在,即使OutOfStockException
被抛出,消息也会被确认,不会尝试重新发送消息:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue
8. 总结
在这篇文章中,我们使用了一个事件驱动的场景,展示了Spring Cloud AWS v3 SQS集成提供的三种确认模式:ON_SUCCESS
(默认)、MANUAL
和ALWAYS
。我们利用了自动配置设置,并使用@SqsListener
注解属性在模式之间切换。我们还创建了现场测试,使用Testcontainers和LocalStack来验证行为。