1. 概述
消息转换是应用程序在传输和接收过程中,将消息在不同格式和表示之间进行转换的过程。
AWS SQS 仅支持文本载荷,而 Spring Cloud AWS SQS 集成提供了熟悉的 Spring 抽象层,默认使用 JSON 管理 POJO 和记录的序列化与反序列化。
本教程将通过事件驱动场景,探讨消息转换的三个常见用例:POJO/记录的序列化与反序列化、自定义 ObjectMapper 配置、反序列化为子类/接口实现。
我们将基于 Spring Cloud AWS SQS V3 入门文章 的环境和测试设置来验证这些用例。
2. 依赖管理
首先导入 Spring Cloud AWS 物料清单 (BOM),它管理依赖版本确保兼容性:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
现在添加核心和 SQS 启动器依赖:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
本教程使用 Spring Boot Web 启动器。注意:由于已导入 Spring Cloud AWS BOM,无需指定版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
最后添加测试依赖 - LocalStack 和 TestContainers、JUnit 5、异步消费验证的 awaitility 库 以及流式断言的 AssertJ:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3. 本地测试环境搭建
添加依赖后,创建 BaseSqsLiveTest
作为测试基类:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
4. 队列名称配置
利用 Spring Boot 的外部化配置,在 application.yml
中定义队列名称:
events:
queues:
shipping:
simple-pojo-conversion-queue: shipping_pojo_conversion_queue
custom-object-mapper-queue: shipping_custom_object_mapper_queue
deserializes-subclass: deserializes_subclass_queue
创建 @ConfigurationProperties
注解类用于注入队列名称:
@ConfigurationProperties(prefix = "events.queues.shipping")
public class ShipmentEventsQueuesProperties {
private String simplePojoConversionQueue;
private String customObjectMapperQueue;
private String subclassDeserializationQueue;
// ...getters and setters
}
在 @Configuration
类上添加 @EnableConfigurationProperties
注解:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
}
5. 应用程序搭建
创建 Shipment
微服务响应 ShipmentRequestedEvent
事件来演示用例。
首先创建 Shipment
实体类:
public class Shipment {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
private ShipmentStatus status;
public Shipment(){}
public Shipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
this.status = status;
}
// ...getters and setters
}
添加 ShipmentStatus
枚举:
public enum ShipmentStatus {
REQUESTED,
PROCESSED,
CUSTOMS_CHECK,
READY_FOR_DISPATCH,
SENT,
DELIVERED
}
创建 ShipmentRequestedEvent
事件类:
public class ShipmentRequestedEvent {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
public ShipmentRequestedEvent() {
}
public ShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
}
public Shipment toDomain() {
return new Shipment(orderId, customerAddress, shipBy, ShipmentStatus.REQUESTED);
}
// ...getters and setters
}
创建 ShipmentService
处理发货逻辑(使用模拟仓库用于测试断言):
@Service
public class ShipmentService {
private static final Logger logger = LoggerFactory.getLogger(ShipmentService.class);
private final Map<UUID, Shipment> shippingRepository = new ConcurrentHashMap<>();
public void processShippingRequest(Shipment shipment) {
logger.info("Processing shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.PROCESSED);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Shipping request processed: {}", shipment.getOrderId());
}
public Shipment getShipment(UUID requestId) {
return shippingRepository.get(requestId);
}
}
6. 默认配置处理 POJO 和记录
Spring Cloud AWS SQS 预配置了 SqsMessagingMessageConverter
,在使用 SqsTemplate
、@SqsListener
注解或手动实例化的 SqsMessageListenerContainer
发送/接收消息时,自动将 POJO 和记录与 JSON 进行序列化/反序列化。
第一个用例演示默认配置下发送和接收简单 POJO。使用 @SqsListener
接收消息,Spring Boot 自动配置反序列化。
先创建测试发送消息:
@SpringBootTest
public class ShipmentServiceApplicationLiveTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private ShipmentService shipmentService;
@Autowired
private ShipmentEventsQueuesProperties queuesProperties;
@Test
void givenPojoPayload_whenMessageReceived_thenDeserializesCorrectly() {
UUID orderId = UUID.randomUUID();
ShipmentRequestedEvent shipmentRequestedEvent = new ShipmentRequestedEvent(orderId, "123 Main St", LocalDate.parse("2024-05-12"));
sqsTemplate.send(queuesProperties.getSimplePojoConversionQueue(), shipmentRequestedEvent);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Shipment shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(shipmentRequestedEvent);
assertThat(shipment
.getStatus()).isEqualTo(ShipmentStatus.PROCESSED);
});
}
}
测试会失败(10秒超时),因为缺少队列监听器。添加第一个 @SqsListener
:
@Component
public class ShipmentRequestListener {
private final ShipmentService shippingService;
public ShipmentRequestListener(ShipmentService shippingService) {
this.shippingService = shippingService;
}
@SqsListener("${events.queues.shipping.simple-pojo-conversion-queue}")
public void receiveShipmentRequest(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
}
再次运行测试即可通过。注意监听器需添加 @Component
注解,队列名称引用自 application.yml
。
此示例展示了 Spring Cloud AWS 开箱即用的 POJO 转换能力,Java records 同样适用。
7. 自定义 ObjectMapper 配置
消息转换的常见需求是配置带应用特定设置的 ObjectMapper
。
下一个场景配置带 LocalDateDeserializer
的 ObjectMapper
解析 "dd-MM-yyyy"
格式日期。
先创建测试场景,通过框架自动配置的 SqsAsyncClient
直接发送原始 JSON 载荷:
@Autowired
private SqsAsyncClient sqsAsyncClient;
@Test
void givenShipmentRequestWithCustomDateFormat_whenMessageReceived_thenDeserializesDateCorrectly() {
UUID orderId = UUID.randomUUID();
String shipBy = LocalDate.parse("2024-05-12")
.format(DateTimeFormatter.ofPattern("dd-MM-yyyy"));
var jsonMessage = """
{
"orderId": "%s",
"customerAddress": "123 Main St",
"shipBy": "%s"
}
""".formatted(orderId, shipBy);
sendRawMessage(queuesProperties.getCustomObjectMapperQueue(), jsonMessage);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment.getShipBy()).isEqualTo(LocalDate.parse(shipBy, DateTimeFormatter.ofPattern("dd-MM-yyyy")));
});
}
private void sendRawMessage(String queueName, String jsonMessage) {
sqsAsyncClient.getQueueUrl(req -> req.queueName(queueName))
.thenCompose(resp -> sqsAsyncClient.sendMessage(req -> req.messageBody(jsonMessage)
.queueUrl(resp.queueUrl())))
.join();
}
添加队列监听器:
@SqsListener("${events.queues.shipping.custom-object-mapper-queue}")
public void receiveShipmentRequestWithCustomObjectMapper(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
运行测试失败,堆栈显示:
Cannot deserialize value of type `java.time.LocalDate` from String "12-05-2024"
因为使用了非标准的 "yyyy-MM-dd"
格式。**解决方案:在 @Configuration
类中声明 ObjectMapper
Bean,自动配置会将其应用到 SqsTemplate
和 @SqsListener
**:
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
JavaTimeModule module = new JavaTimeModule();
LocalDateDeserializer customDeserializer = new LocalDateDeserializer(DateTimeFormatter.ofPattern("dd-MM-yyyy", Locale.getDefault()));
module.addDeserializer(LocalDate.class, customDeserializer);
mapper.registerModule(module);
return mapper;
}
再次运行测试即可通过。
8. 继承和接口反序列化配置
另一种常见场景是父类/接口有多个子类/实现,需要根据消息头或消息内容告知框架反序列化到特定类。
扩展场景增加两种发货类型:InternationalShipment
和 DomesticShipment
,它们都是 Shipment
的子类但有特定属性。
8.1 创建实体和事件
public class InternationalShipment extends Shipment {
private String destinationCountry;
private String customsInfo;
public InternationalShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String destinationCountry, String customsInfo) {
super(orderId, customerAddress, shipBy, status);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
// ...getters and setters
}
public class DomesticShipment extends Shipment {
private String deliveryRouteCode;
public DomesticShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String deliveryRouteCode) {
super(orderId, customerAddress, shipBy, status);
this.deliveryRouteCode = deliveryRouteCode;
}
public String getDeliveryRouteCode() {
return deliveryRouteCode;
}
public void setDeliveryRouteCode(String deliveryRouteCode) {
this.deliveryRouteCode = deliveryRouteCode;
}
}
添加对应事件类:
public class DomesticShipmentRequestedEvent extends ShipmentRequestedEvent {
private String deliveryRouteCode;
public DomesticShipmentRequestedEvent(){}
public DomesticShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String deliveryRouteCode) {
super(orderId, customerAddress, shipBy);
this.deliveryRouteCode = deliveryRouteCode;
}
public DomesticShipment toDomain() {
return new DomesticShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, deliveryRouteCode);
}
// ...getters and setters
}
public class InternationalShipmentRequestedEvent extends ShipmentRequestedEvent {
private String destinationCountry;
private String customsInfo;
public InternationalShipmentRequestedEvent(){}
public InternationalShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String destinationCountry,
String customsInfo) {
super(orderId, customerAddress, shipBy);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
public InternationalShipment toDomain() {
return new InternationalShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, destinationCountry,
customsInfo);
}
// ...getters and setters
}
8.2 添加服务和监听器逻辑
在 ShipmentService
中添加两种发货处理方法:
@Service
public class ShipmentService {
// ...previous code stays the same
public void processDomesticShipping(DomesticShipment shipment) {
logger.info("Processing domestic shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.READY_FOR_DISPATCH);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Domestic shipping processed: {}", shipment.getOrderId());
}
public void processInternationalShipping(InternationalShipment shipment) {
logger.info("Processing international shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.CUSTOMS_CHECK);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("International shipping processed: {}", shipment.getOrderId());
}
}
添加监听器处理消息(使用父类类型接收两种子类型消息):
@SqsListener(queueNames = "${events.queues.shipping.subclass-deserialization-queue}")
public void receiveShippingRequestWithType(ShipmentRequestedEvent shipmentRequestedEvent) {
if (shipmentRequestedEvent instanceof InternationalShipmentRequestedEvent event) {
shippingService.processInternationalShipping(event.toDomain());
} else if (shipmentRequestedEvent instanceof DomesticShipmentRequestedEvent event) {
shippingService.processDomesticShipping(event.toDomain());
} else {
throw new RuntimeException("Event type not supported " + shipmentRequestedEvent.getClass()
.getSimpleName());
}
}
8.3 默认类型头映射反序列化
创建测试,先生成两种事件:
@Test
void givenPayloadWithSubclasses_whenMessageReceived_thenDeserializesCorrectType() {
var domesticOrderId = UUID.randomUUID();
var domesticEvent = new DomesticShipmentRequestedEvent(domesticOrderId, "123 Main St", LocalDate.parse("2024-05-12"), "XPTO1234");
var internationalOrderId = UUID.randomUUID();
InternationalShipmentRequestedEvent internationalEvent = new InternationalShipmentRequestedEvent(internationalOrderId, "123 Main St", LocalDate.parse("2024-05-24"), "Canada", "HS Code: 8471.30, Origin: China, Value: $500");
}
默认情况下 SqsTemplate
发送包含类型信息的头。利用此特性直接发送消息:
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), internationalEvent);
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), domesticEvent);
断言各发货类型状态正确:
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var domesticShipment = (DomesticShipment) shipmentService.getShipment(domesticOrderId);
assertThat(domesticShipment).isNotNull();
assertThat(domesticShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(domesticEvent);
assertThat(domesticShipment.getStatus()).isEqualTo(ShipmentStatus.READY_FOR_DISPATCH);
var internationalShipment = (InternationalShipment) shipmentService.getShipment(internationalOrderId);
assertThat(internationalShipment).isNotNull();
assertThat(internationalShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(internationalEvent);
assertThat(internationalShipment.getStatus()).isEqualTo(ShipmentStatus.CUSTOMS_CHECK);
});
测试通过表明各子类被正确反序列化。
8.4 自定义类型头映射反序列化
当消息来自非 SqsTemplate
服务或事件 POJO 在不同包时,需要自定义映射。
创建自定义 SqsTemplate
发送无类型信息的消息。需注入能序列化 LocalDate
的 ObjectMapper
(如之前配置的或 Spring Boot 自动配置的):
@Autowired
private ObjectMapper objectMapper;
var customTemplate = SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configureDefaultConverter(converter -> {
converter.doNotSendPayloadTypeHeader();
converter.setObjectMapper(objectMapper);
})
.build();
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent);
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent);
测试失败,堆栈显示:
Could not read JSON: Unrecognized field "destinationCountry"
Could not read JSON: Unrecognized field "deliveryRouteCode"
解决方案:使用 SqsMessagingMessageConverter
的 setPayloadTypeMapper
方法基于消息属性指定目标类。本例使用自定义头:
在 application.yml
添加头配置:
headers:
types:
shipping:
header-name: SHIPPING_TYPE
international: INTERNATIONAL
domestic: DOMESTIC
创建属性类:
@ConfigurationProperties(prefix = "headers.types.shipping")
public class ShippingHeaderTypesProperties {
private String headerName;
private String international;
private String domestic;
// ...getters and setters
}
在配置类启用属性:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class, ShippingHeaderTypesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
// ...rest of code remains the same
}
配置自定义 SqsMessagingMessageConverter
并设置到 defaultSqsListenerContainerFactory
:
@Bean
public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(ObjectMapper objectMapper) {
SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
converter.setPayloadTypeMapper(message -> {
if (!message.getHeaders()
.containsKey(typesProperties.getHeaderName())) {
return Object.class;
}
String eventTypeHeader = MessageHeaderUtils.getHeaderAsString(message, typesProperties.getHeaderName());
if (eventTypeHeader.equals(typesProperties.getDomestic())) {
return DomesticShipmentRequestedEvent.class;
} else if (eventTypeHeader.equals(typesProperties.getInternational())) {
return InternationalShipmentRequestedEvent.class;
}
throw new RuntimeException("Invalid shipping type");
});
converter.setObjectMapper(objectMapper);
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(configure -> configure.messageConverter(converter))
.build();
}
在测试中为自定义模板添加头:
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getInternational()));
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getDomestic()));
再次运行测试通过,证明各事件被反序列化为正确的子类类型。
9. 总结
本文探讨了消息转换的三个常见用例:
- 开箱即用的 POJO/记录序列化与反序列化
- 自定义
ObjectMapper
处理不同日期格式等特定配置 - 反序列化为子类/接口实现的两种方式
我们通过搭建本地测试环境和实时测试验证了每个场景。完整代码请参考 GitHub 仓库。