Spring Cloud AWS v3 中的訊息轉換
1. 概述
訊息轉換是應用程式傳輸和接收訊息時在不同格式和表示形式之間進行轉換的過程。
AWS SQS 允許文字有效負載,Spring Cloud AWS SQS 整合提供了熟悉的 Spring 抽象,預設使用 JSON 來管理與 POJO 和記錄之間的文字有效負載的序列化和反序列化。
在本教程中,我們將使用事件驅動場景來了解訊息轉換的三個常見用例:POJO/記錄序列化和反序列化、設定自訂ObjectMapper
以及反序列化為子類別/介面實作。
為了測試我們的用例,我們將利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設定。
2. 依賴關係
讓我們先導入Spring Cloud AWS Bill of Materials ,它管理我們的依賴項的版本,確保它們之間的版本相容性:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
現在,我們可以新增核心和 SQS 啟動相依性:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
在本教程中,我們將使用 Spring Boot Web 啟動器。值得注意的是,我們沒有指定版本,因為我們正在匯入 Spring Cloud AWS 的 BOM :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
最後,讓我們新增測試依賴項 – LocalStack 和 TestContainers with JUnit 5,用於驗證非同步訊息消耗的等待庫,以及使用 Fluent API 處理斷言的 AssertJ,以及 Spring Boot 的測試依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3.建置本地測試環境
現在我們已經新增了依賴項,我們將透過建立BaseSqsLiveTest
來設定測試環境,該測試套件應由我們的測試套件擴展:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
4. 設定隊列名稱
為了利用 Spring Boot 的設定外部化,我們將在application.yml
檔案中新增佇列名稱:
events:
queues:
shipping:
simple-pojo-conversion-queue: shipping_pojo_conversion_queue
custom-object-mapper-queue: shipping_custom_object_mapper_queue
deserializes-subclass: deserializes_subclass_queue
我們現在創建一個@ConfigurationProperties
註解的類,我們將其註入到我們的測試中以檢索隊列名稱:
@ConfigurationProperties(prefix = "events.queues.shipping")
public class ShipmentEventsQueuesProperties {
private String simplePojoConversionQueue;
private String customObjectMapperQueue;
private String subclassDeserializationQueue;
// ...getters and setters
}
最後,我們將@EnableConfigurationProperties
註解加入到@Configuration
類別中:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
}
5. 設定應用程式
我們將創建一個對ShipmentRequestedEvent
做出反應的Shipment
微服務來說明我們的用例.
首先,讓我們建立Shipment
實體,它將保存有關出貨的資訊:
public class Shipment {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
private ShipmentStatus status;
public Shipment(){}
public Shipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
this.status = status;
}
// ...getters and setters
}
接下來,我們加入一個ShipmentStatus
枚舉:
public enum ShipmentStatus {
REQUESTED,
PROCESSED,
CUSTOMS_CHECK,
READY_FOR_DISPATCH,
SENT,
DELIVERED
}
我們還需要ShipmentRequestedEvent
:
public class ShipmentRequestedEvent {
private UUID orderId;
private String customerAddress;
private LocalDate shipBy;
public ShipmentRequestedEvent() {
}
public ShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy) {
this.orderId = orderId;
this.customerAddress = customerAddress;
this.shipBy = shipBy;
}
public Shipment toDomain() {
return new Shipment(orderId, customerAddress, shipBy, ShipmentStatus.REQUESTED);
}
// ...getters and setters
}
為了處理我們的發貨,我們將創建一個簡單的ShipmentService
類,其中包含一個模擬存儲庫,我們將使用它來斷言我們的測試:
@Service
public class ShipmentService {
private static final Logger logger = LoggerFactory.getLogger(ShipmentService.class);
private final Map<UUID, Shipment> shippingRepository = new ConcurrentHashMap<>();
public void processShippingRequest(Shipment shipment) {
logger.info("Processing shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.PROCESSED);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Shipping request processed: {}", shipment.getOrderId());
}
public Shipment getShipment(UUID requestId) {
return shippingRepository.get(requestId);
}
}
6. 使用預設設定處理 POJO 和記錄
Spring Cloud AWS SQS 預先配置了一個 SqsMessagingMessageConverter ,當使用SqsTemplate,
@ SqsListener
註解或手動實例化的SqsMessageListenerContainer
發送和接收訊息時, SqsMessagingMessageConverter
會序列化和反序列化 POJO 和記錄與JSON
。
我們的第一個用例將發送和接收一個簡單的 POJO 來說明此預設配置。我們將使用@SqsListener
註解來接收訊息,並在必要時使用 Spring Boot 的自動配置來配置反序列化。
首先,我們將建立發送訊息的測試:
@SpringBootTest
public class ShipmentServiceApplicationLiveTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private ShipmentService shipmentService;
@Autowired
private ShipmentEventsQueuesProperties queuesProperties;
@Test
void givenPojoPayload_whenMessageReceived_thenDeserializesCorrectly() {
UUID orderId = UUID.randomUUID();
ShipmentRequestedEvent shipmentRequestedEvent = new ShipmentRequestedEvent(orderId, "123 Main St", LocalDate.parse("2024-05-12"));
sqsTemplate.send(queuesProperties.getSimplePojoConversionQueue(), shipmentRequestedEvent);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Shipment shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(shipmentRequestedEvent);
assertThat(shipment
.getStatus()).isEqualTo(ShipmentStatus.PROCESSED);
});
}
}
在這裡,我們建立事件,使用自動配置的SqsTemplate
將其傳送到佇列,並等待狀態變為PROCESSED
。這表示訊息已成功接收並處理。
當測試被觸發時,它會在10
秒後失敗,因為我們還沒有隊列的偵聽器。
讓我們透過建立第一個@SqsListener
來解決這個問題:
@Component
public class ShipmentRequestListener {
private final ShipmentService shippingService;
public ShipmentRequestListener(ShipmentService shippingService) {
this.shippingService = shippingService;
}
@SqsListener("${events.queues.shipping.simple-pojo-conversion-queue}")
public void receiveShipmentRequest(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
}
當我們再次運行測試時,它會在一段時間後通過。
值得注意的是,偵聽器具有@Component
註釋,並且我們引用在application.yml
檔案中設定的佇列名稱。
此範例展示了 Spring Cloud AWS 如何處理開箱即用的 POJO 轉換,這與 Java 記錄的工作方式相同。
7. 配置自訂物件映射器
訊息轉換的常見用例是使用特定於應用程式的配置設定自訂ObjectMapper
。
對於下一個場景,我們將使用LocalDateDeserializer
配置ObjectMapper
以讀取“dd-MM-yyyy”
格式的日期。
同樣,我們將首先建立測試場景。在這種情況下,我們將直接透過框架自動配置的SqsAsyncClient
發送原始 JSON 有效負載:
@Autowired
private SqsAsyncClient sqsAsyncClient;
@Test
void givenShipmentRequestWithCustomDateFormat_whenMessageReceived_thenDeserializesDateCorrectly() {
UUID orderId = UUID.randomUUID();
String shipBy = LocalDate.parse("2024-05-12")
.format(DateTimeFormatter.ofPattern("dd-MM-yyyy"));
var jsonMessage = """
{
"orderId": "%s",
"customerAddress": "123 Main St",
"shipBy": "%s"
}
""".formatted(orderId, shipBy);
sendRawMessage(queuesProperties.getCustomObjectMapperQueue(), jsonMessage);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var shipment = shipmentService.getShipment(orderId);
assertThat(shipment).isNotNull();
assertThat(shipment.getShipBy()).isEqualTo(LocalDate.parse(shipBy, DateTimeFormatter.ofPattern("dd-MM-yyyy")));
});
}
private void sendRawMessage(String queueName, String jsonMessage) {
sqsAsyncClient.getQueueUrl(req -> req.queueName(queueName))
.thenCompose(resp -> sqsAsyncClient.sendMessage(req -> req.messageBody(jsonMessage)
.queueUrl(resp.queueUrl())))
.join();
}
我們也為此隊列添加偵聽器:
@SqsListener("${events.queues.shipping.custom-object-mapper-queue}")
public void receiveShipmentRequestWithCustomObjectMapper(ShipmentRequestedEvent shipmentRequestedEvent) {
shippingService.processShippingRequest(shipmentRequestedEvent.toDomain());
}
當我們現在運行測試時,它失敗了,我們在堆疊追蹤中看到類似以下的訊息:
Cannot deserialize value of type `java.time.LocalDate` from String "12-05-2024"
這是因為我們沒有使用標準的“yyyy-MM-dd”
日期格式。
為了解決這個問題,我們需要配置一個能夠解析此日期格式的ObjectMapper
。我們可以簡單地將它聲明為@Configuration
註解的類別中的 bean,然後自動配置將其正確設定為自動配置的SqsTemplate
和@SqsListener
方法:
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
JavaTimeModule module = new JavaTimeModule();
LocalDateDeserializer customDeserializer = new LocalDateDeserializer(DateTimeFormatter.ofPattern("dd-MM-yyyy", Locale.getDefault()));
module.addDeserializer(LocalDate.class, customDeserializer);
mapper.registerModule(module);
return mapper;
}
當我們再次運行測試時,它按預期通過。
8. 配置繼承和介面反序列化
另一個常見的場景是具有具有各種子類別或實作的超類別或接口,並且有必要通知框架應根據條件(例如MessageHeader
或訊息的一部分)將訊息反序列化到哪個特定類別。
為了說明此用例,讓我們為場景添加一些複雜性,並包括兩種類型的貨件: InternationalShipment
和DomesticShipment
,每種類型都是具有特定屬性的Shipment
的子類別。
8.1.建立實體和事件
public class InternationalShipment extends Shipment {
private String destinationCountry;
private String customsInfo;
public InternationalShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String destinationCountry, String customsInfo) {
super(orderId, customerAddress, shipBy, status);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
// ...getters and setters
}
public class DomesticShipment extends Shipment {
private String deliveryRouteCode;
public DomesticShipment(UUID orderId, String customerAddress, LocalDate shipBy, ShipmentStatus status,
String deliveryRouteCode) {
super(orderId, customerAddress, shipBy, status);
this.deliveryRouteCode = deliveryRouteCode;
}
public String getDeliveryRouteCode() {
return deliveryRouteCode;
}
public void setDeliveryRouteCode(String deliveryRouteCode) {
this.deliveryRouteCode = deliveryRouteCode;
}
}
讓我們加入它們各自的事件:
public class DomesticShipmentRequestedEvent extends ShipmentRequestedEvent {
private String deliveryRouteCode;
public DomesticShipmentRequestedEvent(){}
public DomesticShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String deliveryRouteCode) {
super(orderId, customerAddress, shipBy);
this.deliveryRouteCode = deliveryRouteCode;
}
public DomesticShipment toDomain() {
return new DomesticShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, deliveryRouteCode);
}
// ...getters and setters
}
public class InternationalShipmentRequestedEvent extends ShipmentRequestedEvent {
private String destinationCountry;
private String customsInfo;
public InternationalShipmentRequestedEvent(){}
public InternationalShipmentRequestedEvent(UUID orderId, String customerAddress, LocalDate shipBy, String destinationCountry,
String customsInfo) {
super(orderId, customerAddress, shipBy);
this.destinationCountry = destinationCountry;
this.customsInfo = customsInfo;
}
public InternationalShipment toDomain() {
return new InternationalShipment(getOrderId(), getCustomerAddress(), getShipBy(), ShipmentStatus.REQUESTED, destinationCountry,
customsInfo);
}
// ...getters and setters
}
8.2.新增服務和偵聽器邏輯
我們將在Service
中新增兩種方法,每種方法都用於處理不同類型的貨件:
@Service
public class ShipmentService {
// ...previous code stays the same
public void processDomesticShipping(DomesticShipment shipment) {
logger.info("Processing domestic shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.READY_FOR_DISPATCH);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("Domestic shipping processed: {}", shipment.getOrderId());
}
public void processInternationalShipping(InternationalShipment shipment) {
logger.info("Processing international shipping for order: {}", shipment.getOrderId());
shipment.setStatus(ShipmentStatus.CUSTOMS_CHECK);
shippingRepository.put(shipment.getOrderId(), shipment);
logger.info("International shipping processed: {}", shipment.getOrderId());
}
}
現在讓我們新增處理訊息的偵聽器。值得注意的是,我們在偵聽器方法中使用超類別類型,因為該方法從兩個子類型接收訊息:
@SqsListener(queueNames = "${events.queues.shipping.subclass-deserialization-queue}")
public void receiveShippingRequestWithType(ShipmentRequestedEvent shipmentRequestedEvent) {
if (shipmentRequestedEvent instanceof InternationalShipmentRequestedEvent event) {
shippingService.processInternationalShipping(event.toDomain());
} else if (shipmentRequestedEvent instanceof DomesticShipmentRequestedEvent event) {
shippingService.processDomesticShipping(event.toDomain());
} else {
throw new RuntimeException("Event type not supported " + shipmentRequestedEvent.getClass()
.getSimpleName());
}
}
8.3.使用預設類型標頭映射進行反序列化
設定場景後,我們可以建立測試。首先,讓我們建立每種類型的事件:
@Test
void givenPayloadWithSubclasses_whenMessageReceived_thenDeserializesCorrectType() {
var domesticOrderId = UUID.randomUUID();
var domesticEvent = new DomesticShipmentRequestedEvent(domesticOrderId, "123 Main St", LocalDate.parse("2024-05-12"), "XPTO1234");
var internationalOrderId = UUID.randomUUID();
InternationalShipmentRequestedEvent internationalEvent = new InternationalShipmentRequestedEvent(internationalOrderId, "123 Main St", LocalDate.parse("2024-05-24"), "Canada", "HS Code: 8471.30, Origin: China, Value: $500");
}
繼續使用相同的測試方法,我們現在將發送事件。預設情況下, SqsTemplate
會傳送一個標頭,其中包含用於反序列化的特定類型資訊。透過利用這一點,我們可以簡單地使用自動配置的SqsTemplate
發送訊息,並且它正確地反序列化訊息:
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), internationalEvent);
sqsTemplate.send(queuesProperties.getSubclassDeserializationQueue(), domesticEvent);
最後,我們斷言每次運輸的狀態根據其類型對應於適當的狀態:
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var domesticShipment = (DomesticShipment) shipmentService.getShipment(domesticOrderId);
assertThat(domesticShipment).isNotNull();
assertThat(domesticShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(domesticEvent);
assertThat(domesticShipment.getStatus()).isEqualTo(ShipmentStatus.READY_FOR_DISPATCH);
var internationalShipment = (InternationalShipment) shipmentService.getShipment(internationalOrderId);
assertThat(internationalShipment).isNotNull();
assertThat(internationalShipment).usingRecursiveComparison()
.ignoringFields("status")
.isEqualTo(internationalEvent);
assertThat(internationalShipment.getStatus()).isEqualTo(ShipmentStatus.CUSTOMS_CHECK);
});
當我們現在運行測試時,它通過了,這表明每個子類別都已使用正確的類型和資訊正確反序列化。
8.4.使用自訂類型標頭映射進行反序列化
從可能不使用SqsTemplate
發送訊息的服務接收訊息是很常見的,或者表示事件的 POJO 或記錄可能位於不同的套件中。
為了模擬這種情況,讓我們在測試方法中建立一個自訂SqsTemplate
,並將其配置為發送標頭中沒有類型資訊的訊息。對於這種情況,我們還需要注入一個能夠序列化LocalDate
實例的ObjectMapper
,例如我們先前配置的實例或 Spring Boot 自動配置的實例:
@Autowired
private ObjectMapper objectMapper;
var customTemplate = SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configureDefaultConverter(converter -> {
converter.doNotSendPayloadTypeHeader();
converter.setObjectMapper(objectMapper);
})
.build();
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent);
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent);
現在,我們的測試失敗,並顯示與堆疊追蹤中類似的訊息,因為框架無法知道將其反序列化到哪個特定類別:
Could not read JSON: Unrecognized field "destinationCountry"
Could not read JSON: Unrecognized field "deliveryRouteCode"
為了解決這個用例, SqsMessagingMessageConverter
類別具有setPayloadTypeMapper
方法,該方法可用於讓框架根據訊息的任何屬性了解目標類別。對於此測試,我們將使用自訂標頭作為標準。
首先,讓我們將標頭配置加入到application.yml
:
headers:
types:
shipping:
header-name: SHIPPING_TYPE
international: INTERNATIONAL
domestic: DOMESTIC
我們還將建立一個屬性類別來保存這些值:
@ConfigurationProperties(prefix = "headers.types.shipping")
public class ShippingHeaderTypesProperties {
private String headerName;
private String international;
private String domestic;
// ...getters and setters
}
接下來,讓我們在Configuration
類別中啟用屬性類別:
@EnableConfigurationProperties({ ShipmentEventsQueuesProperties.class, ShippingHeaderTypesProperties.class })
@Configuration
public class ShipmentServiceConfiguration {
// ...rest of code remains the same
}
我們現在將配置一個自訂SqsMessagingMessageConverter
以使用這些標頭並將其設為defaultSqsListenerContainerFactory
bean:
@Bean
public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(ObjectMapper objectMapper) {
SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
converter.setPayloadTypeMapper(message -> {
if (!message.getHeaders()
.containsKey(typesProperties.getHeaderName())) {
return Object.class;
}
String eventTypeHeader = MessageHeaderUtils.getHeaderAsString(message, typesProperties.getHeaderName());
if (eventTypeHeader.equals(typesProperties.getDomestic())) {
return DomesticShipmentRequestedEvent.class;
} else if (eventTypeHeader.equals(typesProperties.getInternational())) {
return InternationalShipmentRequestedEvent.class;
}
throw new RuntimeException("Invalid shipping type");
});
converter.setObjectMapper(objectMapper);
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(configure -> configure.messageConverter(converter))
.build();
}
之後,我們將標頭新增到測試方法中的自訂範本中:
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(internationalEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getInternational()));
customTemplate.send(to -> to.queue(queuesProperties.getSubclassDeserializationQueue())
.payload(domesticEvent)
.header(headerTypesProperties.getHeaderName(), headerTypesProperties.getDomestic()));
當我們再次運行測試時,它通過了,斷言為每個事件反序列化了正確的子類類型。
9. 結論
在本文中,我們介紹了訊息轉換的三個常見用例:使用開箱即用設定的 POJO/記錄序列化和反序列化,使用自訂ObjectMapper
處理不同的日期格式和其他特定配置,以及兩種不同的方式將訊息反序列化到子類別/介面實作。
我們透過設定本地測試環境並創建即時測試來斷言我們的邏輯來測試每個場景。
與往常一樣,本文中使用的完整程式碼可以 在 GitHub 上找到。