使用 Java 處理 Kafka Producer TimeOutException
1. 概述
在本教程中,我們將學習如何在 Kafka Producer 中處理TimeOutException
。
首先,讓我們來了解發生TimeOutException
時可能出現的情況,然後看看如何解決該問題。
2.Kafka Producer中的TimeOutException
我們透過建立ProducerRecord
開始向 Kafka 產生訊息,其中必須包含我們要將記錄傳送到的主題和一個值。或者,我們還可以指定一個鍵、一個分區、一個時間戳記和/或一個標頭集合。
然後分區器為我們選擇一個分區,通常是基於ProducerRecord
鍵。一旦分區器選擇了分區,生產者就會識別記錄的主題和分區。然後,生產者將記錄新增到一批記錄中,並將該記錄傳送到相同主題和分區(我們將其視為緩衝區)。一個單獨的執行緒負責將這些批次的記錄傳送到適當的 Kafka 代理程式。
Kafka 在從生產者向代理程式發送訊息時使用緩衝概念。一旦我們從KafkaProducer
呼叫send()
方法來傳送ProducerRecord
,系統就會將訊息放入緩衝區中,並在單獨的執行緒中將其傳送到緩衝區。
請求逾時或batch size過大會導致KafkaProducer
出現TimeOutException
,即超出緩衝區限製或遇到網路瓶頸。我們來一一了解一下。
3. 請求超時
將記錄新增至批次後,我們需要在指定的時間內傳送該批次,以確保按時傳送。設定參數request.timeout.ms
控制時間限制,預設為三十秒:
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
我們更改請求逾時,以便為發送每個批次留出更多時間。一旦批次排隊時間超過 60 秒,我們就會收到TimeOutException
。
4. 大批量
kafka生產者等待將緩衝區中的資料傳送到broker,直到滿足batch size。如果生產者不滿足批量大小,請求就會逾時。所以我們可以減少batch-size,減少請求逾時的可能性:
producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 100000);
透過減少批量大小,我們可以更頻繁地以更少的訊息批量發送到代理。這可能會避免TimeOutException
。
5. 網路瓶頸
如果我們以高於發送者執行緒處理能力的速率向代理程式發送訊息,則可能會導致網路瓶頸,從而導致TimeOutException
。我們可以使用configuration linger.ms
來處理這個問題:
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
linger.ms
屬性控制在傳送目前批次之前等待其他訊息的時間量。 KafkaProducer
在填滿目前批次或達到linger.ms limit
時發送一批訊息。
預設情況下,一旦有發送者執行緒可用於發送訊息,生產者就會立即發送訊息,即使批次中只有一則訊息。透過將 linger.ms 設定為大於 0,我們指示生產者等待幾毫秒以將其他訊息新增至批次中,然後再將其傳送到代理程式。
這會稍微增加延遲並顯著提高吞吐量 - 每個訊息的開銷要低得多,並且壓縮(如果啟用)會更好。
6. 複製因子
Kafka 提供複製策略的設定。主題級配置和代理級配置均引用min.insync.replicas
。
如果複製因子小於min.insync.replicas,
寫入不會獲得足夠的確認,因此會逾時。使用複製因子 > min.insync.replicas
重新建立主題可以修復此問題。
在配置叢集的資料持久性時,我們可以透過將min.insync.replicas 設為2 來確保生產者至少有兩個被擷取且「同步」的副本。此設定”所有”請求。這可確保至少有兩個副本(領導者和另一個副本)確認寫入成功。
這可以防止在領導者確認寫入然後發生故障並且領導權轉移到沒有成功寫入的副本的情況下丟失資料。如果沒有這些持久的設置,生產者會認為它已成功生產,並且訊息會掉在地板上並丟失。
但是,由於涉及額外的開銷,配置更高的持久性會導致效率降低,因此 kafka 不建議具有高吞吐量、可以容忍偶爾訊息遺失的叢集更改此設定(預設值 1)。
7. 引導伺服器位址
一些與網路相關的問題也可能導致TimeOutException
。
防火牆可能會阻止 Kafka 端口,無論是在生產者端、在代理端還是在中間的某個位置。從運行生產者的伺服器嘗試nc -z broker-ip <port_number>
:
$ nc -z 192.168.123.132 9092
我們發現防火牆是否封鎖了該連接埠。
如果DNS解析被破壞,即使連接埠開放,生產者也找不到IP位址。因此,如果其餘的事情都很好,我們也可以檢查一下。
八、結論
在本文中,我們了解到KafkaProducer
類別中的TimeOutException
可能是由請求逾時、批次大小或網路瓶頸引起的。我們也遇到了其他可能性,例如錯誤的複製因子或伺服器位址配置。
與往常一樣,本文中使用的完整程式碼可以在 GitHub 上找到。