如何為 Kafka 中的現有主題新增分區
1. 概述
Kafka 是一種非常流行的訊息佇列,具有許多功能。我們將訊息儲存在 Kafka 的主題內。主題又被劃分為實際儲存訊息的分區。我們可能會需要增加主題內的分區數量。在本教程中,我們將學習如何實現這一特定場景。
2. 增加分區的原因
在討論如何增加分區之前,有必要討論我們為什麼這樣做。所以,有些場景我們需要增加Kafka中的分割區。這裡列出了一些場景:
- 當生產者生產大量訊息,而 Kafka 分區無法跟上時
- 當我們在消費者群組中新增消費者來處理並行處理時
- 當某些分區處理過多的資料時
- 為了容錯
- 考慮未來的需求,主動增加分區。
因此,我們可以理解新增分區的原因有很多。現在的問題是,我們該如何做到這一點?在下一節中,我們將學習實現此功能的兩種方法。
3.如何新增分區
Kafka提供了兩種新增分區的方法。一種方法是在 CLI 上執行 Kafka 腳本,另一種方法是使用Kafka Admin API ,一種以程式設計方式新增分區的方法。讓我們逐一學習如何使用它們。
3.1.使用 Kafka 腳本
Kafka 提供了kafka-topics.sh
腳本來在主題中新增新的分區。以下是 CLI 指令:
$ bin/kafka-topics.sh --bootstrap-server <broker:port> --topic <topic-name> --alter --partitions <number>
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic my-topic --alter --partitions 3
值得注意的是,這裡的分區數是3
,其中包括現有分區的數量以及新分區的數量。此命令可確保代理總共有三個分割區。
3.2.使用 Kafka API
Kafka 也可以透過其 Admin API 以程式設計方式完成相同的任務。 API 非常簡單,它只需要上述 CLI 指令所需的參數。
首先,我們需要將 Kafka 客戶端庫新增到我們的專案中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
我們可以從Maven Central找到該依賴項的最新版本。
現在,讓我們了解如何以程式設計方式增加分割區:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
adminClient.createPartitions(Collections.singletonMap("my-topic", NewPartitions.increaseTo(3)))
.all()
.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
我們可以看到, AdminClient
需要與代理程式位址、主題名稱和分區總數相關的參數。再次,我們可以從方法名稱本身看到 - increaseTo() –
它的參數期望分區的數量,包括要新增的新分區的數量以及現有分區的數量。
4. 增加分區時的常見陷阱
雖然新增分區似乎是一項相當容易的任務,但它需要付出代價,涉及我們將在本節中討論的幾個注意事項。讓我們討論一些重要的陷阱。
4.1.對訊息排序的影響
我們知道Kafka可以保證分區內訊息的有序性,但不能保證跨分區訊息的有序性。當我們增加分區時,鍵的雜湊值可能會發生變化,導致某些鍵重新指派給不同的分區。這可能會擾亂特定鍵的訊息順序。
如果在我們的系統中,訊息排序至關重要,那麼如果它取決於分區的數量,那麼這種重新散列可能會導致問題。為了避免這種情況,我們不應該使用可能導致問題的關鍵計算函數。我們的關鍵計算函數和分區應該能夠解決這個問題。
4.2.消費者再平衡
新增分區會觸發消費者群組重新平衡,這可能會暫時中斷訊息消費。當重新平衡發生時,消費者可能會短暫停止處理。
因此,有必要在非尖峰時段或計畫的維護時段執行分區增加。我們應該使用Kafka 的正常關閉和重新平衡設定來最大限度地減少中斷。
4.3.增加 Broker 和叢集負載
如果分區數量增加很多,那麼可能會對我們的 Kafka 群造成壓力。這是因為更多的分區意味著代理上的元資料和管理開銷更多。
這就是為什麼我們需要確保在增加分割區之前和之後 Broker 擁有充足的資源(例如 CPU、記憶體和磁碟使用率)。我們需要確保我們的代理有足夠的規模來處理新的負載。
4.4.重新劃分複雜性
雖然 Kafka 允許我們動態新增分區,但它不會在新分區之間重新分配現有資料。因此,只有新資料才會寫入附加分割區,這會導致資料分佈不均勻。
因此,我們有責任重新處理舊資料並將其重新分配到新的分區。避免過於頻繁地添加分區非常重要。我們應該始終規劃我們的分區策略來應對長期成長。
除了上述注意事項之外,我們還面臨更多挑戰,例如客戶端配置問題、延遲問題、分區策略問題等。因此,在非生產伺服器上進行規劃和測試以減輕這些風險非常重要。
5. 結論
在本文中,我們了解了為什麼需要在 Kafka 中新增分區。我們也研究了兩種新增分割區的方法—透過 CLI 和 Kafka Admin API。最後,我們討論了在新增新分區時可能遇到的一些陷阱以及如何預測它們。
本文使用的所有程式碼範例均可在 GitHub 上找到。