使用@KafkaListener在Kafka中大量消費訊息
1. 概述
在本教程中,我們將討論使用 Spring Kafka 函式庫的@KafkaListener
註解批次處理 Kafka 訊息。 Kafka Broker 是一個中間件,有助於持久保存來自來源系統的訊息。目標系統配置為定期輪詢 Kafka 主題/隊列,然後從中讀取訊息。
這可以防止目標系統或服務關閉時訊息遺失。當目標服務恢復時,它們會繼續接受未處理的訊息。因此,這種類型的架構有助於提高訊息的持久性,從而提高系統的容錯能力。
2. 為什麼要批次處理訊息?
多個來源或事件生產者同時向同一個 Kafka 佇列或主題發送訊息是很常見的。因此,其中會累積大量訊息。如果目標服務或消費者在一個會話中接收到如此大量的訊息,他們可能無法有效地處理它們。
這可能會產生連鎖效應,從而導致瓶頸。最終,這會影響所有下游進程,具體取決於訊息。因此,消費者或訊息監聽者應該限制他們在某一時間點可以處理的訊息數量。
要以批次模式運行,我們必須考慮主題上發布的資料量和應用程式的容量來配置正確的批次大小。此外,消費者應用程式應設計為批次處理訊息以滿足 SLA。
此外,如果沒有批次處理,消費者必須定期輪詢 Kafka 主題才能單獨獲取訊息。這種方法給計算資源帶來了壓力。因此,批次處理比每次輪詢處理單一訊息要高效得多。
然而,批次在某些情況下可能不適合:
- 消息量小
- 立即處理對於時間敏感的應用至關重要
- 計算和記憶體資源存在限制
- 嚴格的訊息排序至關重要
3.使用@KafkaListener
註解進行批次處理
為了理解批次處理,我們先定義一個用例。然後我們將首先使用基本訊息處理然後批次處理來實現它。這樣,我們就能更好地體會到批次處理訊息的重要性。
3.1.用例描述
我們假設許多關鍵 IT 基礎設施設備(例如伺服器和網路設備)都在公司的資料中心中運作。多種監控工具可追蹤這些設備的 KPI(關鍵績效指標)。由於營運團隊希望進行主動監控,因此他們期望即時可操作的分析。因此,存在嚴格的 SLA 來將 KPI 傳輸到目標分析應用程式。
營運團隊配置監控工具以定期將 KPI 發送到 Kafka 主題。消費者應用程式從主題讀取訊息,然後將它們推送到資料湖。應用程式從資料湖讀取資料並產生即時分析。
讓我們實現一個配置和不配置批次的消費者。我們將分析兩種實現的差異和結果。
3.2.先決條件
在開始實作批次處理之前,了解 Spring Kafka 庫至關重要。幸運的是,我們在《Apache Kafka 與 Spring 簡介》一文中討論了這個主題,它為我們提供了急需的動力。
出於學習目的,我們需要一個 Kafka 實例。因此,為了快速入門,我們將使用嵌入式 Kafka 。
最後,我們需要一個程序,在 Kafka 代理中建立事件佇列並定期向其發布範例訊息。本質上,我們將使用 Junit5 來理解這個概念。
3.3.基本監聽器
讓我們從一個基本的監聽器開始,它從 Kafka 代理程式中一條一條地讀取訊息。我們將在KafkaKpiConsumerWithNoBatchConfig
配置類別中定義ConcurrentKafkaListenerContainerFactory
bean:
public class KafkaKpiConsumerWithNoBatchConfig {
@Bean(name = "kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBasicListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
kafkaKpiBasicListenerContainerFactory()
方法傳回kafkaKpiListenerContainerFactory
bean。該 bean 幫助配置一個基本偵聽器,它一次可以處理一則訊息:
@Component
public class KpiConsumer {
private CountDownLatch latch = new CountDownLatch(1);
private ConsumerRecord<String, String> message;
@Autowired
private DataLakeService dataLakeService;
@KafkaListener(
id = "kpi-listener",
topics = "kpi_topic",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) throws InterruptedException {
this.message = record;
latch.await();
List<String> messages = new ArrayList<>();
messages.add(record.value());
dataLakeService.save(messages);
//reset the latch
latch = new CountDownLatch(1);
}
//General getter methods
}
我們已將@KafkaListener
註解應用於listen()
方法。此註解有助於設定偵聽器主題和偵聽器容器工廠 bean。 KpiConsumer
類別中的java.util.concurrent.CountDownLatch
物件有助於控制 Junit5 測試中的訊息處理。我們將用它來理解整個概念。
CountDownLatch#await()
方法暫停偵聽器線程,並在測試方法呼叫CountDownLatch#countDown()
方法時恢復該執行緒。如果沒有這一點,理解和追蹤訊息就會很困難。最後,下游DataLakeService#save()
方法接收一則訊息進行處理。
現在讓我們來看看幫助追蹤KpiListener
類別處理的訊息的方法:
@RepeatedTest(10)
void givenKafka_whenMessage1OnTopic_thenListenerConsumesMessages(RepetitionInfo repetitionInfo) {
String testNo = String.valueOf(repetitionInfo.getCurrentRepetition());
assertThat(kpiConsumer.getMessage().value()).isEqualTo("Test KPI Message-".concat(testNo));
kpiConsumer.getLatch().countDown();
}
當監控工具將 KPI 訊息發佈到kpi_topic
Kafka 主題時,偵聽器會依照到達順序接收它們。
每次執行該方法時,它都會追蹤到達KpiListener#listen()
方法的訊息。確認訊息順序後,釋放鎖存器,監聽器完成處理。
3.4.能夠進行批次處理的監聽器
現在,讓我們探討 Kafka 中的批次支援。我們首先在 Spring 設定類別中定義ConcurrentKafkaListenerContainerFactory
bean:
@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory();
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
consumerFactory.updateConfigs(configProps);
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true);
return factory;
}
此方法to the kafkaKpiBasicListenerContainerFactory()
方法類似.
我們透過呼叫ConsumerFactory#setBatchListener()
方法啟用了批次。
此外,我們也藉助ConsumerConfig.MAX_POLL_RECORDS_CONFIG
屬性設定了每次輪詢的最大訊息數。 ConsumerFactory#setConcurrency()
可協助設定並發消費者執行緒的數量,同時處理訊息。其他配置我們可以參考Spring Kafka官方網站。
此外,還有**ConsumerConfig.DEFAULT_FETCH_MAX_BYTES
等配置屬性和**ConsumerConfig.DEFAULT_FETCH_MIN_BYTES
也可以幫助限制訊息大小。
現在,讓我們來看看消費者:
@Component
public class KpiBatchConsumer {
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
private DataLakeService dataLakeService;
private List<String> receivedMessages = new ArrayList<>();
@KafkaListener(
id = "kpi-batch-listener",
topics = "kpi_batch_topic",
batch = "true",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecords<String, String> records) throws InterruptedException {
records.forEach(record -> receivedMessages.add(record.value()));
latch.await();
dataLakeService.save(receivedMessages);
latch = new CountDownLatch(1);
}
// Standard getter methods
}
KpiBatchConsumer
與先前定義的KpiConsumer
類別類似,只是@KafkaListener
註解多了一個batch
屬性。 listen()
方法採用ConsumerRecords
類型的參數而不是ConsumerRecord
。我們可以迭代ConsumerRecords
物件來取得批次中的所有ConsumerRecord
元素。
偵聽器還可以按照訊息到達的順序處理批次接收的訊息。然而,在 Kafka 中跨主題分區維護訊息批次的順序非常複雜。
這裡ConsumerRecord
代表發佈到Kafka主題的訊息。最終,我們使用更多訊息呼叫DataLakeService#save()
方法。最後, CountDownLatch
類別的作用與我們之前看到的相同。
假設有一百個 KPI 訊息推送到kpi_batch_topic
Kafka 主題。我們現在可以檢查正在運行的偵聽器:
@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
int messageSize = kpiBatchConsumer.getReceivedMessages().size();
assertThat(messageSize % 20).isEqualTo(0);
kpiBatchConsumer.getLatch().countDown();
}
與一則地拾取訊息的基本偵聽器不同,這次偵聽器KpiBatchConsumer#listen()
方法接收包含20
KPI 訊息的批次。
4. 結論
在本文中,我們討論了基本 Kafka 偵聽器和啟用批次的偵聽器之間的差異。批次處理有助於同時處理多個訊息以提高應用程式效能。然而,對批次量和訊息大小的適當限制對於控制應用程式的效能非常重要。因此,必須在仔細和嚴格的基準測試過程之後對它們進行最佳化。
本文中使用的源代碼可在 GitHub 上取得。