使用 ReplyingKafkaTemplate 與 Apache Kafka 進行同步通信
1. 概述
Apache Kafka 已成為建構事件驅動架構的最受歡迎和最廣泛使用的訊息傳遞系統之一,其中一個微服務將訊息發佈到某個主題,另一個微服務則非同步使用和處理該訊息。
然而,在某些情況下,發布者微服務需要立即回應才能繼續進一步處理。雖然 Kafka 本質上是為非同步通訊而設計的,但可以將其配置為透過單獨的主題支援同步請求-答覆通訊。
在本教程中,我們將探討如何使用 Apache Kafka 在 Spring Boot 應用程式中實現同步請求-答覆通訊。
2. 設定項目
為了演示,我們將模擬一個通知調度系統。我們將創建一個 Spring Boot 應用程序,它將充當生產者和消費者。
2.1.依賴項
讓我們先將Spring Kafka 依賴項新增到專案的pom.xml
檔案中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.4</version>
</dependency>
這種依賴關係為我們提供了建立連線和與配置的 Kafka 實例互動所需的類別。
2.2.定義請求-答覆訊息
接下來,讓我們定義兩個記錄來表示我們的請求和回覆訊息:
record NotificationDispatchRequest(String emailId, String content) {
}
public record NotificationDispatchResponse(UUID notificationId) {
}
這裡, NotificationDispatchRequest
記錄保存通知的emailId
和content
,而NotificationDispatchResponse
記錄包含處理請求後產生的唯一notificationId
。
2.3.定義 Kafka 主題和配置屬性
現在,讓我們定義我們的請求和回覆Kafka主題。此外,我們將配置從消費者元件接收回覆的逾時時間。
我們將這些屬性儲存在專案的application.yaml
檔案中,並使用@ConfigurationProperties
將值對應到 Java 記錄,我們的配置和服務層可以引用該記錄:
@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
@NotBlank
String requestTopic,
@NotBlank
String replyTopic,
@NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
Duration replyTimeout
) {
}
我們還添加了驗證註釋以確保所有必需的屬性都配置正確。如果任何定義的驗證失敗,Spring ApplicationContext
將無法啟動。這使我們能夠符合快速失敗原則。
下面是我們的application.yaml
檔案的片段,它定義了將自動對應到我們的SynchronousKafkaProperties
記錄的必要屬性:
com:
baeldung:
kafka:
synchronous:
request-topic: notification-dispatch-request
reply-topic: notification-dispatch-response
reply-timeout: 30s
在這裡,我們配置請求和回覆 Kafka 主題名稱以及三十秒的reply-timeout
時。
除了自訂屬性之外,我們還在application.yaml
檔案中加入一些核心 Kafka 配置屬性:
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: synchronous-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.baeldung.kafka.synchronous
properties:
allow:
auto:
create:
topics: true
首先,為了允許我們的應用程式連接到已配置的 Kafka 實例,我們使用環境變數來配置其引導伺服器 URL。
接下來,我們為consumer
和producer
配置鍵和值的序列化和反序列化屬性。此外,對於我們的consumer
,我們配置一個group-id
並信任包含我們的請求-答覆記錄的套件以進行JSON反序列化。
配置上述屬性後,Spring Kafka 會自動為我們建立ConsumerFactory
和ProducerFactory
類型的 bean 。我們將在下一節中使用它們來定義額外的 Kafka 配置 bean。
最後,我們啟用主題的自動建立功能,因此如果主題不存在,Kafka 就會自動建立主題。值得注意的是,我們僅為了演示而啟用了此屬性 - 在生產應用程式中不應這樣做。
2.4.定義 Kafka 配置 Bean
有了配置屬性,讓我們定義必要的 Kafka 配置 bean:
@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
String replyTopic = synchronousKafkaProperties.replyTopic();
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
首先,我們注入ConsumerFactory
實例並將其與配置的replyTopic
一起使用來建立KafkaMessageListenerContainer
bean。這個 bean 負責建立一個從我們的回覆主題輪詢訊息的容器。
接下來,我們將定義在服務層中用於執行同步通訊的核心 bean:
`@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
ProducerFactory<String, NotificationDispatchRequest> producerFactory,
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
return replyingKafkaTemplate;
}`
使用ProducerFactory
和先前定義的KafkaMessageListenerContainer
bean,我們建立一個ReplyingKafkaTemplate
bean。此外,使用自動連線的synchronousKafkaProperties,
我們設定了在application.yaml
檔案中定義的reply-timeout,這將決定我們的服務在逾時之前等待回應的時間。
這個ReplyingKafkaTemplate
bean 管理請求和回應主題之間的交互,使得透過 Kafka 進行同步通信成為可能。
最後,讓我們定義 bean 以使我們的監聽器元件能夠將回應傳回回覆主題:
`@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}`
首先,我們使用ProducerFactory
bean 來建立一個標準的KafkaTemplate
bean。
然後,我們將它與ConsumerFactory
bean 一起用來定義KafkaListenerContainerFactory
bean。該 bean 使得我們的監聽器元件能夠在完成所需的處理之後,使用來自請求主題的訊息,並向回應主題發送訊息。
3.使用Kafka實現同步通信
配置完成後,讓我們在兩個配置的 Kafka 主題之間實現同步請求-答覆通訊。
3.1.使用ReplyingKafkaTemplate
發送和接收訊息
首先,讓我們建立一個NotificationDispatchService
類,該類別使用我們先前定義的ReplyingKafkaTemplate
bean 將訊息傳送到配置的請求主題:
`@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {
private final SynchronousKafkaProperties synchronousKafkaProperties;
private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;
// standard constructor
NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
String requestTopic = synchronousKafkaProperties.requestTopic();
ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);
var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
return requestReplyFuture.get().value();
}
}`
在這裡,在我們的dispatch()
方法中,我們使用自動連接的synchronousKafkaProperties
實例來提取在我們的application.yaml
檔案中配置的requestTopic
。然後,我們將它與方法參數中傳遞的notificationDispatchRequest
一起使用來建立ProducerRecord
實例。
接下來我們將建立的ProducerRecord
實例傳遞給sendAndReceive()
方法,以將訊息發佈到請求主題。該方法傳回一個RequestReplyFuture
對象,我們使用它等待回應然後返回其值。
在底層,當我們呼叫sendAndReceive()
方法時, ReplyingKafkaTemplate
類別會產生一個唯一的關聯 ID(一個隨機 UUID),並將其附加到傳出訊息的標頭中。此外,它還添加了一個包含回應主題名稱的標題,它希望在該標題中收到回應。請記住,我們已經在KafkaMessageListenerContainer
bean 中設定了回應主題。
ReplyingKafkaTemplate
bean 使用產生的關聯 ID 作為鍵將RequestReplyFuture
物件儲存在線程安全的ConcurrentHashMap
中。這使得它即使在多線程環境中也能工作並支援並發請求。
3.2.定義 Kafka 訊息監聽器
接下來,為了完成我們的實現,讓我們建立一個監聽器元件,用於監聽配置的請求主題中的訊息並將回應發送回回應主題:
@Component
class NotificationDispatchListener {
@SendTo
@KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
// ... processing logic
UUID notificationId = UUID.randomUUID();
return new NotificationDispatchResponse(notificationId);
}
}
我們使用@KafkaListener
註解來監聽我們在application.yaml
檔案中設定的請求主題。
在我們的listen()
方法中,我們只傳回一個包含唯一notificationId
的NotificationDispatchResponse
記錄。
重要的是,我們用@SendTo
註解來註解我們的方法,它指示Spring Kafka從訊息頭中提取關聯ID和回應主題名稱。它使用它們自動將方法的回傳值傳送到提取的回應主題,並將相同的關聯ID新增到訊息標頭。
這允許我們的NotificationDispatchService
類別中的ReplyingKafkaTemplate
bean 使用它最初產生的關聯 ID 來取得正確的RequestReplyFuture
物件。
4. 結論
在本文中,我們探討如何使用 Apache Kafka 實作 Spring Boot 應用程式中兩個元件之間的同步通訊。
我們完成了必要的配置並模擬了通知調度系統。
透過使用ReplyingKafkaTemplate
,我們可以將 Apache Kafka 的非同步特性轉換為同步請求-答覆模式。這種方法有點不合常規,因此在投入生產之前仔細評估它是否符合專案架構非常重要。
與往常一樣,本文使用的所有程式碼範例均可在 GitHub 上找到。