Kafka 生產者和消費者訊息確認選項
一、簡介
眾所周知,Apache Kafka 是一個訊息傳遞和串流媒體系統。它提供了確保可靠性保證的確認選項。在本教程中,我們將了解 Apache Kafka 中生產者和消費者的確認選項。
2. 生產者致謝選項
即使有可靠配置的 Kafka 代理,我們也必須將生產者配置為可靠。我們可以使用三種確認模式之一來配置生產者,我們將在接下來介紹。
2.1.不確認
我們可以將屬性acks
設定為0
:
static KafkaProducer<String, String> producerack0;
static Properties getProducerProperties() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return producerProperties;
}
static void setUp() throws IOException, InterruptedException {
Properties producerProperties = getProducerProperties();
producerProperties.put(ProducerConfig.ACKS_CONFIG,
"0");
producerack0 = new KafkaProducer<>(producerProperties);
}
在此配置中,生產者不會等待代理的回應。它假定訊息發送成功。如果出現問題且代理程式沒有收到訊息,生產者不會意識到這一點,而且訊息會遺失。
但是,由於生產者不等待伺服器的任何回應,因此它可以以網路支援的速度發送訊息,從而實現高吞吐量。
如果生產者設法透過網路發送訊息,則認為該訊息已成功寫入 Kafka。由於發送的物件無法序列化、網路卡故障等錯誤,導致客戶端訊息發送不成功。但在它到達代理之後,如果分區離線、領導者選舉正在進行,或者即使整個 Kafka 群集不可用,我們也不會收到任何錯誤。
使用acks=0
運行會導致生產者延遲較低,但它不會改善端對端延遲,因為消費者在系統將訊息複製到所有可用副本之前不會看到訊息。
2.2.致謝領導者
我們可以將屬性acks
設定為1
:
static KafkaProducer<String, String> producerack1;
static Properties getProducerProperties() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return producerProperties;
}
static void setUp() throws IOException, InterruptedException {
Properties producerack1Prop = getProducerProperties();
producerack1Prop.put(ProducerConfig.ACKS_CONFIG,
"1");
producerack1 = new KafkaProducer<>(producerack1Prop);
}
當領導者副本收到訊息時,生產者會收到來自代理商的成功回應。如果由於任何錯誤而無法將訊息寫入領導者,生產者會收到錯誤回應並重試發送訊息,從而避免潛在的資料遺失。
我們仍然可能因其他原因丟失訊息,例如領導者在系統將最新消息複製到新領導者之前崩潰了。
領導者在收到訊息並將其寫入分區資料檔案時立即發送確認或錯誤。如果領導者關閉或崩潰,我們可能會遺失資料。崩潰可能會導致一些成功寫入和確認的訊息在崩潰之前無法複製到追蹤者。
使用acks=1配置,寫入領導者的速度可能比複製訊息的速度更快,在這種情況下,我們最終會得到複製不足的分區,因為領導者在複製訊息之前會確認來自生產者的訊息。當我們等待一個副本收到訊息時,延遲高於acks=0配置。
2.3.全部確認
我們也可以將屬性acks
設定為all
:
static KafkaProducer<String, String> producerackAll;
static Properties getProducerProperties() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return producerProperties;
}
static void setUp() throws IOException, InterruptedException {
Properties producerackAllProp = getProducerProperties();
producerackAllProp.put(ProducerConfig.ACKS_CONFIG,
"all");
producerackAll = new KafkaProducer<>(producerackAllProp);
}
一旦所有同步副本收到訊息,生產者就會收到來自代理商的成功回應。這是最安全的模式,因為我們可以確保多個代理程式都擁有該訊息,並且即使在崩潰的情況下該訊息仍然存在。
領導者會等待所有同步副本收到訊息,然後再發回確認或錯誤。代理程式上的min.insync.replicas
配置允許我們指定生產者確認訊息之前必須接收的最小副本數。
這是最安全的選擇。生產者繼續嘗試發送訊息,直到完全提交為止。此配置的生產者延遲最高,因為生產者等待所有同步副本以取得所有訊息,然後才能將訊息批次標記為「完成」並繼續。
將acks
屬性設為值-1
相當於將其設為值all
。
屬性**acks
只能設定為三個可能值之一: 0
、 1
或all
/ -1
。如果設定為這三個以外的任何值,那麼 Kafka 會拋出ConfigException
** 。
對於acks
配置1
和all
,我們可以使用生產者屬性retries
、 retry.backoff.ms
和delivery.timeout.ms
來處理生產者重試。
3. 消費者確認選項
只有在 Kafka 將數據標記為已提交後,消費者才能存取數據,這確保系統將數據寫入所有同步副本。這保證了消費者收到一致的數據。他們唯一的責任是追蹤他們已閱讀和尚未閱讀的訊息。這是在使用訊息時不遺失訊息的關鍵。
當從分區讀取資料時,消費者會取得一批訊息,檢查該批次中的最後一個偏移量,然後從接收到的最後一個偏移量開始要求另一批訊息。這保證了 Kafka 消費者始終以正確的順序獲取新數據,而不會丟失任何訊息。
我們有四個消費者配置屬性,了解這些屬性對於配置我們的消費者以獲得所需的可靠性行為非常重要。
3.1.組號
每個 Kafka 消費者都屬於一個群組,由屬性group.id
標識。
假設一個群組中有兩個消費者,這表示兩者俱有相同的群組 ID。 Kafka 系統將每個消費者分配給主題中的分區子集。每個消費者單獨讀取訊息的子集。整個小組閱讀所有訊息。
如果我們需要消費者自己查看其訂閱主題中的每個訊息,則它需要一個唯一的group.id
。
3.2.自動偏移重置
屬性**auto.offset.reset
確定消費者在開始讀取沒有提交偏移量或無效提交偏移量的分區時的行為:**
static Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return consumerProperties;
}
Properties consumerProperties = getConsumerProperties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
// ...
}
或者
static Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return consumerProperties;
}
Properties consumerProperties = getConsumerProperties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
// ...
}
預設值是latest,
這意味著,如果缺少有效的偏移量,消費者就會從最新的記錄開始讀取。消費者僅考慮其開始運作後寫入的記錄。
財產的替代價值earliest
。這意味著,如果缺少有效的偏移量,消費者將從頭開始讀取分區中的所有資料。配置auto.offset.reset
設定為none
會在從無效偏移量消耗時導致異常。
3.3.啟用自動提交
屬性enable.auto.commit
控制消費者是否自動提交偏移量,預設為true
:
static Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return consumerProperties;
}
Properties consumerProperties = getConsumerProperties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
// ...
}
如果設定為false
,我們可以控制系統何時提交偏移量。這使我們能夠最大限度地減少重複並避免丟失資料。
將屬性enable.auto.commit
設定為true
允許我們使用auto.commit.interval.ms
控制提交的頻率。
3.4.自動提交間隔
在自動提交配置中,屬性auto.commit.interval.ms
配置 Kafka 系統提交偏移量的頻率:
static Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
return consumerProperties;
}
Properties consumerProperties = getConsumerProperties();
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 7000);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
// ...
}
預設值為每五秒一次。一般來說,更頻繁地提交會增加開銷,但會減少消費者停止時可能發生的重複次數。
4. 結論
在本文中,我們了解了 Apache Kafka 的生產者和消費者確認選項以及如何使用它們。 Kafka 中的確認選項可讓開發人員微調效能和可靠性之間的平衡,使其成為適用於各種用例的多功能係統。
與往常一樣,本文中使用的完整程式碼可以在 GitHub 上找到。