與 Kafka Producer 重試
1. 概述
在這篇簡短的文章中,我們將探討KafkaProducer's
重試機制以及如何調整其設定以適應特定的用例。
我們將討論關鍵屬性及其預設值,然後為我們的範例自訂它們。
2. 預設配置
KafkaProducer
的預設行為是在訊息未被代理確認時重試發布。為了證明這一點,我們可以透過故意錯誤配置主題設定來導致生產者失敗。
首先,讓我們將[kafka-clients](https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients)
依賴項加入pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
現在,我們需要模擬 Kafka Broker 拒絕生產者發送的訊息時的用例。例如,我們可以使用「 min.insync.replicas
」主題配置,它在寫入被視為成功之前驗證最小數量的副本是否同步。
讓我們建立一個主題並將該屬性設為2
,即使我們的測試環境僅包含一個 Kafka 代理。因此,新消息總是被拒絕,這使我們能夠測試生產者的重試機制:
@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
NewTopic newTopic = new NewTopic("test-topic-1", 1, (short) 1)
.configs(Map.of("min.insync.replicas", "2"));
adminClient.createTopics(singleton(newTopic)).all().get();
// publish message and verify exception
}
然後,我們創建一個KafkaProducer,
向該主題發送一條訊息,並驗證它重試多次,最終在兩分鐘後超時:
@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
// set topic config
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-1", "test-value");
assertThatThrownBy(() -> producer.send(record).get())
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)
.hasMessageContaining("Expiring 1 record(s) for test-topic-1-0");
}
從異常和日誌中我們可以觀察到,生產者多次嘗試發送訊息,最終在兩分鐘後逾時。此行為與KafkaProducer
的預設設定一致:
-
retries
(預設為Integer.MAX_VALUE
):嘗試發布訊息的最大次數 -
delivery.timeout.ms
(預設為120,000):在認為訊息失敗之前等待訊息被確認的最長時間 -
retry.backoff.ms
(預設為100):重試之前等待的時間 -
retry.backoff.max.ms
(預設為1,000):連續重試之間的最大延遲
3. 自訂重試配置
不用說,我們可以調整KafkaProducer
重試配置,以便更好地滿足我們的需求。
例如,我們可以將最大傳遞時間設定為 5 秒,在重試之間使用500
毫秒的延遲,並將最大重試次數降低到20
:
@Test
void givenCustomConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
// set topic config
Properties props = new Properties();
// other properties
props.put(RETRIES_CONFIG, 20);
props.put(RETRY_BACKOFF_MS_CONFIG, "500");
props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-2", "test-value");
assertThatThrownBy(() -> producer.send(record).get())
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)
.hasMessageContaining("Expiring 1 record(s) for test-topic-2-0");
}
正如預期的那樣,生產者在自訂超時五秒後停止重試。日誌顯示重試之間有500
毫秒的延遲,並確認重試計數從 20 開始,並隨著每次嘗試而減少:
12:57:19.599 [kafka-producer-network-thread | producer-1] WARN oakcproducer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition test-topic-2-0, retrying (19 attempts left). Error: NOT_ENOUGH_REPLICAS
12:57:20.107 [kafka-producer-network-thread | producer-1] WARN oakcproducer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 6 on topic-partition test-topic-2-0, retrying (18 attempts left). Error: NOT_ENOUGH_REPLICAS
12:57:20.612 [kafka-producer-network-thread | producer-1] WARN oakcproducer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 7 on topic-partition test-topic-2-0, retrying (17 attempts left). Error: NOT_ENOUGH_REPLICAS
[...]
4. 結論
在這個簡短的教學中,我們探索了KafkaProducer
的重試配置。我們學習如何設定最大傳遞時間、指定重試次數以及配置失敗嘗試之間的延遲。
與往常一樣,程式碼可以在 GitHub 上取得。