消費者延遲處理Kafka訊息
1. 概述
Apache Kafka 是一個事件流平台,可大規模收集、處理、儲存和整合資料。有時,我們可能會想要延遲處理來自 Kafka 的訊息。一個例子是客戶訂單處理系統,設計用於在延遲 X 秒後處理訂單,並在此時間範圍內容納取消訂單。
在本文中,我們將探索使用 Spring Kafka 的消費者延遲處理 Kafka 訊息。儘管 Kafka 沒有為訊息的延遲消費提供開箱即用的支持,但我們將研究替代的實作選項。
2. 應用上下文
Kafka 提供了多種重試錯誤的方法。我們將使用此重試機制來延遲消費者對訊息的處理。因此,有必要了解 Kafka 重試的工作原理。
讓我們考慮一個訂單處理應用程序,客戶可以在其中的 UI 上下訂單。用戶可以在10秒內取消錯誤下的訂單。這些訂單轉到 Kafka 主題web.orders
,我們的應用程式在那裡處理它們。
外部服務公開最新的訂單狀態( CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED
)。我們的應用程式需要接收訊息,等待 10 秒,然後與外部服務檢查訂單是否處於CONFIRMED
狀態,即用戶在 10 秒內沒有取消訂單。
為了進行測試,從web.orders.internal
收到的內部訂單不應延遲。
讓我們加入一個簡單的Order
模型,其中orderGeneratedDateTime
由生產者填充, orderProcessedTime
在延遲持續時間後由消費者填充:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private UUID orderId;
private LocalDateTime orderGeneratedDateTime;
private LocalDateTime orderProcessedTime;
private List<String> address;
private double price;
}
3.Kafka監聽器和外部服務
接下來,我們將新增一個用於主題消費的偵聽器和一個公開訂單狀態的服務。
讓我們加入一個[KafkaListener](https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html)
,它讀取並處理來自主題web.orders
和web.internal.orders
訊息:
@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
Order orderDetails = objectMapper.readValue(order, Order.class);
OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
orderService.processOrder(orderDetails);
}
}
包含KafkaBackoffException
非常重要,以便偵聽器允許重試。為簡單起見,我們假設外部OrderService
始終傳回訂單狀態為CONFIRMED.
此外, processOrder()
方法將訂單處理時間設定為當前時間並將訂單儲存到HashMap
中:
@Service
public class OrderService {
HashMap<UUID, Order> orders = new HashMap<>();
public Status findStatusById(UUID orderId) {
return Status.ORDER_CONFIRMED;
}
public void processOrder(Order order) {
order.setOrderProcessedTime(LocalDateTime.now());
orders.put(order.getOrderId(), order);
}
}
4. 自訂延遲訊息監聽器
Spring-Kafka 提出了KafkaBackoffAwareMessageListenerAdapter
,它擴展了AbstractAdaptableMessageListener
並實現了AcknowledgingConsumerAwareMessageListener
。此適配器檢查退避dueTimestamp
標頭,並透過呼叫KafkaConsumerBackoffManager
退避訊息或重試處理。
現在讓我們實作類似KafkaBackoffAwareMessageListenerAdapter
的DelayedMessageListenerAdapter
。此適配器應該能夠靈活地配置每個主題的延遲以及0
秒的預設延遲:
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>>
implements AcknowledgingConsumerAwareMessageListener<K, V> {
// Field declaration and constructor
public void setDelayForTopic(String topic, Duration delay) {
Objects.requireNonNull(topic, "Topic cannot be null");
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.delaysPerTopic.put(topic, delay);
}
public void setDefaultDelay(Duration delay) {
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.defaultDelay = delay;
}
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
.toMillis(), consumer));
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
}
private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp,
this.listenerId,
new TopicPartition(data.topic(), data.partition()), consumer);
}
}
對於每個傳入訊息,此適配器首先接收記錄並檢查為主題設定的延遲。這將在配置中設置,如果未設置,則使用預設延遲。
KafkaConsumerBackoffManager#backOffIfNecessary
方法的現有實作檢查上下文記錄時間戳記和目前時間戳記之間的差異。如果差異為正,表示沒有消耗,分區將暫停並引發KafkaBackoffException
。否則,它將記錄傳送到KafkaListener
方法進行消費。
5. 監聽器配置
[ConcurrentKafkaListenerContainerFactory](https://docs.spring.io/spring-kafka/api/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.html)
是 Spring Kafka 的預設實現,負責為KafkaListener
建構容器。它允許我們配置並發KafkaListener
實例的數量。每個容器都可以被視為一個邏輯線程池,其中每個線程負責監聽來自一個或多個 Kafka 主題的訊息。
DelayedMessageListenerAdapter
需要透過聲明自訂ConcurrentKafkaListenerContainerFactory.
我們可以為特定主題(例如web.orders
設定延遲,也可以為任何其他主題設定預設延遲0
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory,
ListenerContainerRegistry registry, TaskScheduler scheduler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setContainerCustomizer(container -> {
DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
delayedAdapter.setDefaultDelay(Duration.ZERO);
container.setupMessageListener(delayedAdapter);
});
return factory;
}
@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager,
ConcurrentMessageListenerContainer<Object, Object> container) {
return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
.getMessageListener(), backOffManager, container.getListenerId());
}
private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
return new ContainerPartitionPausingBackOffManager(registry,
new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}
值得注意的是,在RECORD
層級設定確認模式對於確保消費者在處理過程中發生錯誤時重新傳遞訊息至關重要。
最後,我們需要定義一個TaskScheduler
bean 以在延遲持續時間後恢復暫停的分割區,並且需要將該排程器注入到DelayedMessageListenerAdapter
將使用的BackOffManager
中:
@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
6. 測試
讓我們確保web.orders
主題上的訂單在通過測試進行處理之前經歷 10 秒的延遲:
@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// then
Map<UUID, Order> orders = orderService.getOrders();
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() >= 10;
});
}
接下來,我們將測試web.internal.orders
的所有訂單是否遵循預設的0
秒延遲:
@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// Then
Map<UUID, Order> orders = orderService.getOrders();
System.out.println("Time...." + Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds());
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() <= 1;
});
}
七、結論
在本教程中,我們探討了 Kafka 消費者如何以固定時間間隔延遲處理訊息。
我們可以修改實現,透過利用嵌入的訊息持續時間作為訊息的一部分來動態設定處理延遲。
與往常一樣,範例的原始程式碼可在 GitHub 上取得。