1. 概述

在消息系统中,消息确认是一种标准机制,它向消息代理发送信号,表明消息已接收,不应再进行重发。在亚马逊的简单队列服务(SQS)中,确认是通过删除队列中的消息来执行的。

在这个教程中,我们将探讨Spring Cloud AWS SQS v3提供的三种预设确认模式:ON_SUCCESSMANUALALWAYS。我们将使用事件驱动的场景来展示这些用例,并利用《Spring Cloud AWS SQS V3入门》文章中的环境和测试设置。

2. 依赖项

首先,我们需要导入Spring Cloud AWS Bill of Materials,确保pom.xml中的所有依赖项兼容:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.1.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

接着,添加核心和SQS启动器依赖:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

最后,为了测试,我们需要添加LocalStack和TestContainers的依赖,以及JUnit 5、awaitility库用于验证异步消息消费,以及AssertJ处理断言:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.assertj</groupId>
    <artifactId>assertj-core</artifactId>
    <scope>test</scope>
</dependency>

3. 配置本地测试环境

首先,我们将使用Testcontainers配置一个LocalStack环境,用于本地测试:

@Testcontainers
public class BaseSqsLiveTest {

    private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

    @Container
    static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
        registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
        registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
          .toString());
    }
}

尽管这种设置让测试变得简单且可重复,但请注意,本教程中的代码也可以直接针对AWS运行。

4. 设置队列名称

5. 成功处理后的确认

注意,测试完成后可能会出现“连接拒绝”错误——这是因为Docker容器在框架停止轮询消息之前就停止了。我们可以安全地忽略这些错误。

6. 手动确认

框架支持手动确认消息,这对于需要对确认过程有更多控制的场景非常有用。

6.1. 创建监听器

为了演示这一点,我们将创建一个异步场景,其中InventoryService有一个慢速连接,我们希望在完成前释放监听线程:

@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
    CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
      .thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
      .thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
      .thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
    logger.info("Releasing processing thread.");
}

在这个逻辑中,我们使用Java的CompletableFuture异步运行库存检查。我们在监听方法中添加了一个Acknowledge对象,并在注解的acknowledgementMode属性中设置了SqsListenerAcknowledgementMode.MANUAL。这个属性是一个String,接受属性占位符和SpEL。当我们将AcknowledgementMode设置为MANUAL时,才会提供Acknowledgement对象。

请注意,此示例利用了Spring Boot的自动配置,提供了合理的默认值,以及@SqsListener注解属性来切换确认模式。另一种选择是声明一个SqsMessageListenerContainerFactorybean,以便设置更复杂的配置。

6.2. 模拟慢速连接

现在,让我们在InventoryService类中添加slowCheckInventory方法,使用Thread.sleep模拟慢速连接:

public void slowCheckInventory(UUID productId, int quantity) {
    simulateBusyConnection();
    checkInventory(productId, quantity);
}

private void simulateBusyConnection() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
    }
}

6.3. 测试

接下来,编写我们的测试:

@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.PROCESSED));
    assertQueueIsEmpty(queueName, "async-order-processing-container");
}

这次,我们请求的是库存中有可用数量的产品,所以不应抛出任何错误。

运行测试时,我们会看到收到消息的日志信息:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]

然后,我们会看到线程释放消息:

INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.

这是因为我们异步处理并确认了消息。大约两秒后,我们应该看到消息已被确认的日志:

INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged

最后,我们会看到停止容器和断言队列为空的日志:

INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue

7. 无论成功或错误都确认

我们将探讨的最后一种确认模式是ALWAYS,它会导致框架无论监听方法是否抛出异常都会确认消息

7.1. 创建监听器

让我们模拟一次销售事件,库存有限,我们不希望重新处理任何消息,不论出现何种失败。我们将在application.yml中早先定义的属性中设置确认模式为ALWAYS

@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
    logger.info("Message received: {}", orderCreatedEvent);
    orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
    inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());

    logger.info("Message processed: {}", orderCreatedEvent);
}

在测试中,我们将创建一个超出库存数量的订单:

7.2. 测试

@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
    var orderId = UUID.randomUUID();
    var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
    sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
    Awaitility.await()
      .atMost(Duration.ofMinutes(1))
      .until(() -> orderService.getOrderStatus(orderId)
        .equals(OrderStatus.RECEIVED));
    assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}

现在,即使OutOfStockException被抛出,消息也会被确认,不会尝试重新发送消息:

Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue

8. 总结

在这篇文章中,我们使用了一个事件驱动的场景,展示了Spring Cloud AWS v3 SQS集成提供的三种确认模式:ON_SUCCESS(默认)、MANUALALWAYS。我们利用了自动配置设置,并使用@SqsListener注解属性在模式之间切换。我们还创建了现场测试,使用Testcontainers和LocalStack来验证行为。