如何從 Kafka 中的特定偏移量讀取訊息
1. 概述
Kafka 是一種流行的開源分散式訊息流中間件,它將訊息生產者與訊息消費者解耦。它使用發布-訂閱模式將它們解耦。 Kafka 使用主題分發資訊。每個主題由不同的分片組成,在 Kafka 術語中稱為分區。分區中的每個訊息都有一個特定的偏移量。
在本教程中,我們將討論如何使用kafka-console-consumer.sh
命令列工具從主題分區的特定偏移量讀取。我們在範例中使用的 Kafka 版本是 3.7.0。
2. 分區和偏移量簡要說明
Kafka 將寫入主題的訊息拆分為多個分區。具有相同密鑰的所有訊息都保存在同一分區中。但是,如果沒有金鑰,Kafka 會向隨機分區發送訊息。
Kafka保證分區內訊息的順序,但不保證跨分區的訊息順序。分區中的每個訊息都有一個 ID。這個ID稱為分區偏移量。隨著新訊息附加到分區,分區偏移量不斷增加。
消費者預設從低偏移量到高偏移量的分區讀取訊息。然而,我們可能需要從分區中的特定偏移量開始讀取訊息。我們將在下一節中了解如何實現這一目標。
3. 一個例子
在本節中,我們將了解如何從特定偏移量讀取。我們假設 Kafka 伺服器正在運行,並且已經使用kafka-topics.sh
建立了一個名為test-topic
主題。主題分為三個分區。
Kafka 提供了我們在範例中使用的所有腳本。
3.1.寫訊息
我們使用kafka-console-producer.sh
腳本啟動一個生產者:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>
Kafka 伺服器在localhost
和連接埠9092
上偵聽客戶端連線。因此, –bootstrap-server localhost:9092
選項用於連接到 Kafka 伺服器
在沒有密鑰的情況下編寫主題時,主題僅發送到隨機選擇的分區之一。然而,在我們的範例中,我們希望主題平均分配到所有分區,因此我們使用RoundRobinPartitioner
策略使生產者以循環方式寫入主題。指令的–producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
部分指定了此行為。
箭頭符號>
表明我們已準備好發送訊息。現在讓我們發送六條訊息:
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>Message1
>Message2
>Message3
>Message4
>Message5
>Message6
>
第一則訊息是Message1
,而最後一則訊息是Message6
。我們有三個分區,因此由於循環分區,我們希望Message1
和Message4
位於同一分區中。同樣, Message2
和Message5
以及Message3
和Message6
應該位於另外兩個分區。
3.2.閱讀訊息
現在,我們將從特定偏移量讀取訊息。我們使用kafka-console-consumer.sh
啟動消費者:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 0 --offset 0
Message2
Message5
這裡, –partition 0
**和–offset 0
選項**指定要使用的分區和偏移量。分區和偏移量的編號從0開始。
我們從第一個偏移量開始的第一個分區讀取的訊息是Message2
和Message5
。正如預期的那樣,它們位於同一分區中。 **kafka-console-consumer.sh
**不會退出並繼續運行以讀取新訊息。
可以從第二個偏移量開始讀取第一個分割區中的訊息:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 0 --offset 1
Message5
由於–offset 1
選項,我們在這種情況下只讀取Message5
。我們也可以指定要讀取的訊息數量:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 0 --offset 0 --max-messages 1
Message2
Processed a total of 1 messages
–max-messages
選項指定退出之前要消耗的訊息數。在這種情況下,我們只讀取Message2
,因為我們將–max-messages 1
傳遞給kafka-console-consumer.sh
。 kafka-console-consumer.sh
在讀取所需數量的訊息後退出。否則,它將等待,直到讀取所需數量的消息。
讀取其他兩個分割區的訊息也是同樣的方式:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 1 --offset 0 --max-messages 2
Message1
Message4
Processed a total of 2 messages
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --partition 2 --offset 0 --max-messages 2
Message3
Message6
Processed a total of 2 messages
結果符合預期。
但是,如果使用–offset
傳遞給kafka-console-consumer.sh
的值大於分區中可用訊息的數量,則kafka-console-consumer.sh
會等待,直到訊息寫入該分區並讀取該訊息立即地。
4. 結論
在本文中,我們學習如何使用kafka-console-consumer.sh
命令列工具從主題分區的特定偏移量讀取。
首先,我們了解到分區中的每個訊息都有一個稱為分區偏移量的 ID。通常,Kafka 從具有最低偏移量的訊息開始在分區中傳遞訊息。
然後,我們看到我們可以分別使用kafka-console-consumer.sh
的–partition
和–offset
選項從特定分區和偏移量讀取。此外,我們也了解到–max-messages
選項指定要讀取的訊息數。