Kafka 中的消費者搜索
1. 概述
Kafka 中的查找類似於在讀取之前在磁碟上定位儲存的資料。在從分區讀取資料之前,我們必須先尋找到正確的位置。
Kafka 消費者偏移量是一個獨特的、穩定成長的數字,標示事件記錄在分區中的位置。組中的每個消費者都為每個分區保留自己的偏移量以追蹤進度。
出於重播事件或跳到最新消息等原因,消費者可能需要處理分區中不同位置的消息。
在本教學中,我們將探索 Spring Kafka API 方法來擷取分區內各個位置的訊息。
2.使用Java API進行查找
在大多數情況下,消費者從分區的開頭讀取訊息並繼續偵聽新訊息。然而,在某些情況下,我們可能需要從特定位置、時間或相對位置進行讀取。
讓我們探索一個 API,它提供不同的端點,透過指定偏移量或從開頭或結尾讀取來從分區檢索記錄。
2.1.按偏移量尋找
Spring Kafka提供了一個seek()
方法來將讀取器定位在分區內給定的偏移量處。
讓我們先透過取得分區和偏移值來探索在分區內按偏移量進行查找:
@GetMapping("partition/{partition}/offset/{offset}")
public ResponseEntity<Response> getOneByPartitionAndOffset(@PathVariable("partition") int partition,
@PathVariable("offset") int offset) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(),
consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
這裡,API 公開了一個端點partition/{partition}/offset/{offset
},它將主題、分區和偏移量傳遞給seek()
方法,定位消費者以在指定位置檢索訊息。回應模型包括分區、偏移量和訊息內容:
public record Response(int partition, long offset, String value) { }
為簡單起見,API 僅檢索指定位置的一筆記錄。但是,我們可以修改它以恢復從該偏移量開始的所有訊息。它也不處理給定偏移量不可用的情況。
為了測試這一點,作為第一步,我們添加一個在所有測試之前運行的方法,在指定主題中產生 5 個簡單訊息:
@BeforeAll
static void beforeAll() {
// set producer config for the broker
testKafkaProducer = new KafkaProducer<>(props);
int partition = 0;
IntStream.range(0, 5)
.forEach(m -> {
String key = String.valueOf(new Random().nextInt());
String value = "Message no : %s".formatted(m);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME,
partition,
key,
value
);
try {
testKafkaProducer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
這裡設定了生產者配置,並向分區0
發送5
格式為“Message no : %s”.formatted(m)
訊息,其中m
表示0
到4
之間的整數。
接下來,讓我們新增一個測試,透過傳遞分區0
和偏移量2:
@Test
void givenKafkaBrokerExists_whenSeekByPartition_thenMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/offset/2")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
}
透過呼叫此 API 端點,我們可以看到位於偏移量2
第三條訊息已成功接收。
2.2.從頭開始尋找
seekToBeginning()
方法將消費者定位在分區的開頭,允許它從第一個訊息開始檢索訊息。
接下來,讓我們新增一個端點,該端點在分割區的開頭公開第一個訊息:
@GetMapping("partition/{partition}/beginning")
public ResponseEntity<Response> getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToBeginning(Collections.singleton(topicPartition));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(),
consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
在這裡,API 提供端點partition/{partition}/beginning
,將主題和分區傳遞給seekToBeginning()
方法。這使消費者能夠從分割區的開頭讀取訊息。回應包括分區、偏移量和訊息內容。
接下來,我們新增一個測試來檢索分區0
開頭的訊息。請注意,測試的@BeforeAll
部分確保生產者將 5 個訊息推送到測試主題:
@Test
void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/beginning")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
}
我們可以透過呼叫此 API 端點來檢索儲存在偏移量0
處的第一個訊息。
2.3.按末尾查找
seekToEnd()
方法將使用者定位在分區的末尾,允許它檢索任何附加的未來訊息。
接下來,讓我們建立一個端點來尋找分區末端的偏移位置:
@GetMapping("partition/{partition}/end")
public ResponseEntity<Long> getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToEnd(Collections.singleton(topicPartition));
return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
}
}
此API提供端點partition/{partition}/end
,將主題和分區傳遞給seekToEnd()
方法。這使消費者能夠從分區末尾讀取訊息。
由於尋找末尾意味著沒有可用的新訊息,因此該 API 會顯示分區內的當前偏移位置。讓我們新增一個測試來驗證這一點:
@Test
void givenKafkaBrokerExists_whenSeekByEnd_thenLastMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/end")
.exchange()
.expectStatus()
.isOk()
.expectBody(Long.class)
.isEqualTo(5L);
}
使用seekToEnd()
將消費者移至將寫入以下訊息的下一個偏移量,將其放置在最後一個可用訊息之外的一個位置。當我們呼叫此 API 端點時,回應將傳回最後一個偏移位置加一。
2.4.透過實現ConsumerSeekAware
類別進行查找
除了使用消費者 API 讀取特定位置的訊息之外,我們還可以擴展 Spring Kafka 中的AbstractConsumerSeekAware
類別。此類別允許消費者動態控制 Kafka 分區中的查找。它提供了在分區分配期間尋找特定偏移量或時間戳記的方法,從而更好地控制訊息消耗。
除了上述查找方法外, AbstractConsumerSeekAware
還提供從特定時間戳或相對位置找到的功能。
讓我們探討一下本節中的相對位置查找:
void seekRelative(java.lang.String topic, int partition, long offset, boolean toCurrent)
Spring Kafka中的[seekRelative()](https://javadoc.io/doc/org.springframework.kafka/spring-kafka/2.6.9/org/springframework/kafka/listener/ConsumerSeekAware.ConsumerSeekCallback.html#seekRelative(java.lang.String,int,long,boolean))
方法允許消費者在分區內尋找相對於當前或開始偏移量的位置。每個參數都有特定的作用:
-
topic
:從中讀取訊息的 Kafka 主題的名稱 -
partition
:主題內將發生查找的分區號 -
offset
:相對於目前或起始偏移量要移動的位置數。這可以是正數或負數 -
toCurrent
:布林值。如果true
,則該方法將相對於目前偏移量進行查找。如果為false
,則相對於分割區的開頭進行查找
讓我們新增一個自訂偵聽器,它使用seekRelative()
API 來尋找分區內的最新消息:
@Component
class ConsumerListener extends AbstractConsumerSeekAware {
public static final Map<String, String> MESSAGES = new HashMap<>();
@Override
public void onPartitionsAssigned(Map<TopicPartition,
Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet()
.forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
}
@KafkaListener(id = "test-seek", topics = "test-seek-topic")
public void listen(ConsumerRecord<String, String> in) {
MESSAGES.put(in.key(), in.value());
}
}
onPartitionsAssigned
方法中呼叫seekRelative()
方法,在接收到分區分配時手動調整消費者的位置。
偏移值-1告訴消費者從參考點向後移動一個位置。在這種情況下,由於toCurrent
設定為 false,它告訴消費者相對於分區末端進行查找。這意味著消費者從最後一條可用訊息後移一個位置。
記憶體中的哈希映射追蹤讀取的訊息以進行測試,並將接收到的訊息儲存為字串。
最後,我們新增一個測試,透過檢查映射來驗證系統是否成功檢索到偏移量4
的消息:
@Test
void givenKafkaBrokerExists_whenMessagesAreSent_ThenLastMessageShouldBeRetrieved() {
Map<String, String> messages = consumerListener.MESSAGES;
Assertions.assertEquals(1, messages.size());
Assertions.assertEquals("Message no : 4", messages.get("4"));
}
測試的@BeforeAll
部分確保生產者向測試主題推送 5 個訊息。尋找配置成功檢索分區中的最後一則訊息。
三、結論
在本教程中,我們探討了 Kafka 消費者如何使用 Spring Kafka 尋找分區中的特定位置。
我們首先研究了使用消費者 API 進行查找,這在需要精確控制分區中的讀取位置時非常有用。此方法最適合重播事件、跳過某些訊息或套用基於偏移量的自訂邏輯等場景。
接下來,我們研究了使用偵聽器時的查找,這更適合連續消費訊息。此方法在處理後定期自動提交偏移量,因為預設情況下, enable.auto.commit
屬性設定為true
。
與往常一樣,範例的原始程式碼可在 GitHub 上取得。