1. 概述

消息转换是应用程序在传输和接收过程中,将消息在不同格式和表示之间进行转换的过程。

AWS SQS 仅支持文本载荷,而 Spring Cloud AWS SQS 集成提供了熟悉的 Spring 抽象层,默认使用 JSON 管理 POJO 和记录的序列化与反序列化。

本教程将通过事件驱动场景,探讨消息转换的三个常见用例:POJO/记录的序列化与反序列化、自定义 ObjectMapper 配置、反序列化为子类/接口实现。

我们将基于 Spring Cloud AWS SQS V3 入门文章 的环境和测试设置来验证这些用例。

2. 依赖管理

首先导入 Spring Cloud AWS 物料清单 (BOM),它管理依赖版本确保兼容性:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>${spring-cloud-aws.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

现在添加核心和 SQS 启动器依赖:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

本教程使用 Spring Boot Web 启动器。注意:由于已导入 Spring Cloud AWS BOM,无需指定版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

最后添加测试依赖 - LocalStack 和 TestContainersJUnit 5、异步消费验证的 awaitility 库 以及流式断言的 AssertJ

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.assertj</groupId>
    <artifactId>assertj-core</artifactId>
    <scope>test</scope>
</dependency>

3. 本地测试环境搭建

添加依赖后,创建 BaseSqsLiveTest 作为测试基类:

@Testcontainers
public class BaseSqsLiveTest {

    private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";

    @Container
    static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
        registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
        registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
          .toString());
    }
}

4. 队列名称配置

利用 Spring Boot 的外部化配置,在 application.yml 中定义队列名称:

events:
  queues:
    shipping:
      simple-pojo-conversion-queue: shipping_pojo_conversion_queue
      custom-object-mapper-queue: shipping_custom_object_mapper_queue
      deserializes-subclass: deserializes_subclass_queue

创建 @ConfigurationProperties 注解类用于注入队列名称:

@ConfigurationProperties(prefix = "events.queues.shipping")
public class ShipmentEventsQueuesProperties {

    private String simplePojoConversionQueue;

    private String customObjectMapperQueue;

    private String subclassDeserializationQueue;

    // ...getters and setters
}

@Configuration 类上添加 @EnableConfigurationProperties 注解:

@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
}

5. 应用程序搭建

创建 Shipment 微服务响应 ShipmentRequestedEvent 事件来演示用例。

首先创建 Shipment 实体类:

public class Shipment {

    private UUID orderId;
    private String customerAddress;
    private LocalDate shipBy;
    private ShipmentStatus status;

    public Shipment(){}

    public Shipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status) {
        this.orderId = orderId;
        this.customerAddress = customerAddress;
        this.shipBy = shipBy;
        this.status = status;
    }
    
    // ...getters and setters
}

添加 ShipmentStatus 枚举:

public enum ShipmentStatus {
    REQUESTED,
    PROCESSED,
    CUSTOMS_CHECK,
    READY_FOR_DISPATCH,
    SENT,
    DELIVERED
}

创建 ShipmentRequestedEvent 事件类:

public class ShipmentRequestedEvent {

    private UUID orderId;
    private String customerAddress;
    private LocalDate shipBy;

    public ShipmentRequestedEvent() {
    }

    public ShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy) {
        this.orderId = orderId;
        this.customerAddress = customerAddress;
        this.shipBy = shipBy;
    }

    public Shipment toDomain() {
        return new Shipment(orderId, customerAddress, shipBy, ShipmentStatus.REQUESTED);
    }

    // ...getters and setters
}

创建 ShipmentService 处理发货逻辑(使用模拟仓库用于测试断言):

@Service
public class ShipmentService {

    private static final Logger logger = LoggerFactory.getLogger(ShipmentService.class);

    private final Map<UUID, Shipment> shippingRepository = new ConcurrentHashMap<>();

    public void processShippingRequest(Shipment shipment) {
        logger.info("Processing shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.PROCESSED);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("Shipping request processed: {}", shipment.getOrderId());
    }

    public Shipment getShipment(UUID requestId) {
        return shippingRepository.get(requestId);
    }
}

6. 默认配置处理 POJO 和记录

Spring Cloud AWS SQS 预配置了 SqsMessagingMessageConverter,在使用 SqsTemplate@SqsListener 注解或手动实例化的 SqsMessageListenerContainer 发送/接收消息时,自动将 POJO 和记录与 JSON 进行序列化/反序列化。

第一个用例演示默认配置下发送和接收简单 POJO。使用 @SqsListener 接收消息,Spring Boot 自动配置反序列化。

先创建测试发送消息:

@SpringBootTest
public class ShipmentServiceApplicationLiveTest extends BaseSqsLiveTest {

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private ShipmentService shipmentService;

    @Autowired
    private ShipmentEventsQueuesProperties queuesProperties;

    @Test
    void givenPojoPayload_whenMessageReceived_thenDeserializesCorrectly() {
        UUID orderId = UUID.randomUUID();
        ShipmentRequestedEvent shipmentRequestedEvent = new ShipmentRequestedEvent(orderId, "123 Main St", LocalDate.parse("2024-05-12"));

        sqsTemplate.send(queuesProperties.getSimplePojoConversionQueue(), shipmentRequestedEvent);

        await().atMost(Duration.ofSeconds(10))
            .untilAsserted(() -> {
                Shipment shipment = shipmentService.getShipment(orderId);
                assertThat(shipment).isNotNull();
                assertThat(shipment).usingRecursiveComparison()
                  .ignoringFields("status")
                  .isEqualTo(shipmentRequestedEvent);
                assertThat(shipment
                  .getStatus()).isEqualTo(ShipmentStatus.PROCESSED);
            });
    }
}

测试会失败(10秒超时),因为缺少队列监听器。添加第一个 @SqsListener

@Component
public class ShipmentRequestListener {

    private final ShipmentService shippingService;

    public ShipmentRequestListener(ShipmentService shippingService) {
        this.shippingService = shippingService;
    }

    @SqsListener("${events.queues.shipping.simple-pojo-conversion-queue}")
    public void receiveShipmentRequest(ShipmentRequestedEvent shipmentRequestedEvent) {
        shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
    }
}

再次运行测试即可通过。注意监听器需添加 @Component 注解,队列名称引用自 application.yml

此示例展示了 Spring Cloud AWS 开箱即用的 POJO 转换能力,Java records 同样适用。

7. 自定义 ObjectMapper 配置

消息转换的常见需求是配置带应用特定设置的 ObjectMapper

下一个场景配置带 LocalDateDeserializerObjectMapper 解析 "dd-MM-yyyy" 格式日期。

先创建测试场景,通过框架自动配置的 SqsAsyncClient 直接发送原始 JSON 载荷:

    @Autowired
    private SqsAsyncClient sqsAsyncClient;

    @Test
    void givenShipmentRequestWithCustomDateFormat_whenMessageReceived_thenDeserializesDateCorrectly() {
        UUID orderId = UUID.randomUUID();
        String shipBy = LocalDate.parse("2024-05-12")
          .format(DateTimeFormatter.ofPattern("dd-MM-yyyy"));

        var jsonMessage = """
            {
                "orderId": "%s",
                "customerAddress": "123 Main St",
                "shipBy": "%s"
            }
            """.formatted(orderId, shipBy);

        sendRawMessage(queuesProperties.getCustomObjectMapperQueue(), jsonMessage);

        await().atMost(Duration.ofSeconds(10))
          .untilAsserted(() -> {
              var shipment = shipmentService.getShipment(orderId);
              assertThat(shipment).isNotNull();
              assertThat(shipment.getShipBy()).isEqualTo(LocalDate.parse(shipBy, DateTimeFormatter.ofPattern("dd-MM-yyyy")));
          });
    }

    private void sendRawMessage(String queueName, String jsonMessage) {
        sqsAsyncClient.getQueueUrl(req -> req.queueName(queueName))
          .thenCompose(resp -> sqsAsyncClient.sendMessage(req -> req.messageBody(jsonMessage)
              .queueUrl(resp.queueUrl())))
          .join();
    }

添加队列监听器:

@SqsListener("${events.queues.shipping.custom-object-mapper-queue}")
public void receiveShipmentRequestWithCustomObjectMapper(ShipmentRequestedEvent shipmentRequestedEvent) {
    shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}

运行测试失败,堆栈显示:

Cannot deserialize value of type `java.time.LocalDate` from String "12-05-2024"

因为使用了非标准的 "yyyy-MM-dd" 格式。**解决方案:在 @Configuration 类中声明 ObjectMapper Bean,自动配置会将其应用到 SqsTemplate@SqsListener**:

@Bean
public ObjectMapper objectMapper() {
    ObjectMapper mapper = new ObjectMapper();
    JavaTimeModule module = new JavaTimeModule();
    LocalDateDeserializer customDeserializer = new LocalDateDeserializer(DateTimeFormatter.ofPattern("dd-MM-yyyy", Locale.getDefault()));
    module.addDeserializer(LocalDate.class, customDeserializer);
    mapper.registerModule(module);
    return mapper;
}

再次运行测试即可通过。

8. 继承和接口反序列化配置

另一种常见场景是父类/接口有多个子类/实现,需要根据消息头或消息内容告知框架反序列化到特定类

扩展场景增加两种发货类型:InternationalShipmentDomesticShipment,它们都是 Shipment 的子类但有特定属性。

8.1 创建实体和事件

public class InternationalShipment extends Shipment {

    private String destinationCountry;
    private String customsInfo;

    public InternationalShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
        String destinationCountry, String customsInfo) {
        super(orderId, customerAddress, shipBy, status);
        this.destinationCountry = destinationCountry;
        this.customsInfo = customsInfo;
    }

    // ...getters and setters
}

public class DomesticShipment extends Shipment {

    private String deliveryRouteCode;

    public DomesticShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
        String deliveryRouteCode) {
        super(orderId, customerAddress, shipBy, status);
        this.deliveryRouteCode = deliveryRouteCode;
    }

    public String getDeliveryRouteCode() {
        return deliveryRouteCode;
    }

    public void setDeliveryRouteCode(String deliveryRouteCode) {
        this.deliveryRouteCode = deliveryRouteCode;
    }
}

添加对应事件类:

public class DomesticShipmentRequestedEvent extends ShipmentRequestedEvent {

    private String deliveryRouteCode;

    public DomesticShipmentRequestedEvent(){}

    public DomesticShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String deliveryRouteCode) {
        super(orderId, customerAddress, shipBy);
        this.deliveryRouteCode = deliveryRouteCode;
    }

    public DomesticShipment toDomain() {
        return new DomesticShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, deliveryRouteCode);
    }

    // ...getters and setters
}

public class InternationalShipmentRequestedEvent extends ShipmentRequestedEvent {

    private String destinationCountry;
    private String customsInfo;

    public InternationalShipmentRequestedEvent(){}

    public InternationalShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String destinationCountry,
        String customsInfo) {
        super(orderId, customerAddress, shipBy);
        this.destinationCountry = destinationCountry;
        this.customsInfo = customsInfo;
    }

    public InternationalShipment toDomain() {
        return new InternationalShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, destinationCountry,
            customsInfo);
    }

    // ...getters and setters
}

8.2 添加服务和监听器逻辑

ShipmentService 中添加两种发货处理方法:

@Service
public class ShipmentService {

    // ...previous code stays the same

    public void processDomesticShipping(DomesticShipment shipment) {
        logger.info("Processing domestic shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.READY_FOR_DISPATCH);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("Domestic shipping processed: {}", shipment.getOrderId());
    }

    public void processInternationalShipping(InternationalShipment shipment) {
        logger.info("Processing international shipping for order: {}", shipment.getOrderId());
        shipment.setStatus(ShipmentStatus.CUSTOMS_CHECK);
        shippingRepository.put(shipment.getOrderId(), shipment);
        logger.info("International shipping processed: {}", shipment.getOrderId());
    }
}

添加监听器处理消息(使用父类类型接收两种子类型消息):

@SqsListener(queueNames = "${events.queues.shipping.subclass-deserialization-queue}")
public void receiveShippingRequestWithType(ShipmentRequestedEvent shipmentRequestedEvent) {
    if (shipmentRequestedEvent instanceof InternationalShipmentRequestedEvent event) {
        shippingService.processInternationalShipping(event.toDomain());
    } else if (shipmentRequestedEvent instanceof DomesticShipmentRequestedEvent event) {
        shippingService.processDomesticShipping(event.toDomain());
    } else {
        throw new RuntimeException("Event type not supported " + shipmentRequestedEvent.getClass()
            .getSimpleName());
    }
}

8.3 默认类型头映射反序列化

创建测试,先生成两种事件:

@Test
void givenPayloadWithSubclasses_whenMessageReceived_thenDeserializesCorrectType() {
    var domesticOrderId = UUID.randomUUID();
    var domesticEvent = new DomesticShipmentRequestedEvent(domesticOrderId, "123 Main St", LocalDate.parse("2024-05-12"), "XPTO1234");
    var internationalOrderId = UUID.randomUUID();
    InternationalShipmentRequestedEvent internationalEvent = new InternationalShipmentRequestedEvent(internationalOrderId, "123 Main St", LocalDate.parse("2024-05-24"), "Canada", "HS Code: 8471.30, Origin: China, Value: $500");
}

默认情况下 SqsTemplate 发送包含类型信息的头。利用此特性直接发送消息:

sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), internationalEvent);
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), domesticEvent);

断言各发货类型状态正确:

await().atMost(Duration.ofSeconds(10))
    .untilAsserted(() -> {
        var domesticShipment = (DomesticShipment) shipmentService.getShipment(domesticOrderId);
        assertThat(domesticShipment).isNotNull();
        assertThat(domesticShipment).usingRecursiveComparison()
          .ignoringFields("status")
          .isEqualTo(domesticEvent);
        assertThat(domesticShipment.getStatus()).isEqualTo(ShipmentStatus.READY_FOR_DISPATCH);

        var internationalShipment = (InternationalShipment) shipmentService.getShipment(internationalOrderId);
        assertThat(internationalShipment).isNotNull();
        assertThat(internationalShipment).usingRecursiveComparison()
          .ignoringFields("status")
          .isEqualTo(internationalEvent);
        assertThat(internationalShipment.getStatus()).isEqualTo(ShipmentStatus.CUSTOMS_CHECK);
    });

测试通过表明各子类被正确反序列化。

8.4 自定义类型头映射反序列化

当消息来自非 SqsTemplate 服务或事件 POJO 在不同包时,需要自定义映射。

创建自定义 SqsTemplate 发送无类型信息的消息。需注入能序列化 LocalDateObjectMapper(如之前配置的或 Spring Boot 自动配置的)

@Autowired
private ObjectMapper objectMapper;

var customTemplate = SqsTemplate.builder()
  .sqsAsyncClient(sqsAsyncClient)
  .configureDefaultConverter(converter -> {
        converter.doNotSendPayloadTypeHeader();
        converter.setObjectMapper(objectMapper);
    })
  .build();

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(internationalEvent);

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(domesticEvent);

测试失败,堆栈显示:

Could not read JSON: Unrecognized field "destinationCountry"
Could not read JSON: Unrecognized field "deliveryRouteCode"

解决方案:使用 SqsMessagingMessageConvertersetPayloadTypeMapper 方法基于消息属性指定目标类。本例使用自定义头:

application.yml 添加头配置:

headers:
  types:
    shipping:
      header-name: SHIPPING_TYPE
      international: INTERNATIONAL
      domestic: DOMESTIC

创建属性类:

@ConfigurationProperties(prefix = "headers.types.shipping")
public class ShippingHeaderTypesProperties {

    private String headerName;
    private String international;
    private String domestic;

    // ...getters and setters
}

在配置类启用属性:

@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class, ShippingHeaderTypesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
   // ...rest of code remains the same
}

配置自定义 SqsMessagingMessageConverter 并设置到 defaultSqsListenerContainerFactory

@Bean
public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(ObjectMapper objectMapper) {
    SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
    converter.setPayloadTypeMapper(message -> {
        if (!message.getHeaders()
          .containsKey(typesProperties.getHeaderName())) {
            return Object.class;
        }
        String eventTypeHeader = MessageHeaderUtils.getHeaderAsString(message, typesProperties.getHeaderName());
        if (eventTypeHeader.equals(typesProperties.getDomestic())) {
            return DomesticShipmentRequestedEvent.class;
        } else if (eventTypeHeader.equals(typesProperties.getInternational())) {
            return InternationalShipmentRequestedEvent.class;
        }
        throw new RuntimeException("Invalid shipping type");
    });
    converter.setObjectMapper(objectMapper);

    return SqsMessageListenerContainerFactory.builder()
      .sqsAsyncClient(sqsAsyncClient)
      .configure(configure -> configure.messageConverter(converter))
      .build();
}

在测试中为自定义模板添加头:

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(internationalEvent)
  .header(headerTypesProperties.getHeaderName(), headerTypesProperties.getInternational()));

customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
  .payload(domesticEvent)
  .header(headerTypesProperties.getHeaderName(), headerTypesProperties.getDomestic()));

再次运行测试通过,证明各事件被反序列化为正确的子类类型。

9. 总结

本文探讨了消息转换的三个常见用例:

  • 开箱即用的 POJO/记录序列化与反序列化
  • 自定义 ObjectMapper 处理不同日期格式等特定配置
  • 反序列化为子类/接口实现的两种方式

我们通过搭建本地测试环境和实时测试验证了每个场景。完整代码请参考 GitHub 仓库


原始标题:Message Conversion in Spring Cloud AWS v3 | Baeldung