如何將 Kafka 與 ElasticSearch 連接起來
1. 概述
在本教程中,我們將學習如何使用Kafka Connector Sink將Apache Kafka連接到ElasticSearch 。
Kafka 專案提供了 Kafka Connect,這是一個強大的工具,可以實現 Kafka 與外部資料儲存來源的無縫集成,而無需額外的程式碼或應用程式。
2.為什麼要使用 Kafka Connect?
Kafka Connect 提供了一種在 Kafka 和各種資料儲存(包括 ElasticSearch)之間傳輸資料的簡單方法。我們不需要編寫自訂應用程式來使用 Kafka 的資料並將其轉儲到 ElasticSearch 中,而是可以使用它,因為它專為可擴展性、容錯性和可管理性而設計。 Kafka Connect 的一些好處包括:
- 可擴充性:Kafka Connect 可以以分散式模式運行,允許多個工作進程分擔負載
- 容錯:自動處理故障,以便能夠保持資料的正確性和完整性。這也使我們的管道更具彈性
- 自助服務連接器:無需編寫自訂整合元件或服務
- 高度可配置:透過簡單的配置和 API 輕鬆設定和管理
3.Docker 設定
讓我們使用Docker來部署和管理我們的安裝。這將簡化設定並減少平台依賴問題。各個團隊都維護著所有所需服務的官方圖像。
我們將定義一個 Docker Compose 檔案來啟動服務:Kafka、Zookeeper、ElasticSearch 和 Kafka Connect。在本文中,我們不會深入討論 Kafka 設置,但在這裡我們將了解更多有關它的資訊。
第一步是建立 Docker Compose 檔案:
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0
environment:
discovery.type: single-node
xpack.security.enabled: "false"
ports:
- "9200:9200"
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_GROUP_ID: kafka-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
基本上,我們創建了 Zookeeper 來保存我們的 Kafka 叢集設置,一個 Kafka 代理來處理我們的主題數據,並將其指向 Zookeeper 服務。然後,我們也建立了一個 ElasticSearch 實例,為了簡單起見,我們停用了身份驗證。
我們的 Kafka Connect 基本屬性需要最少的設定即可在本地運行我們的 Kafka 連接器。它們設定諸如複製因子、預設轉換器和 Kafka 叢集位址等內容。若要了解所有配置, 請查看官方文件頁面。
需要強調的是,不建議將上述配置用於生產。相反,它是使用 Kafka Connectors 的快速入門指南。彈性和容錯能力不是本文關注的重點。
一旦我們了解 Docker Compose 檔案的內容,我們就可以運行我們的服務:
# use -d to run in background
docker compose up
一旦容器運行,我們需要手動安裝 Elasticsearch Sink Connector,因為它沒有內建在 Kafka Connector 中。為此,讓我們執行以下命令:
docker exec -it kafka-elastic-search-kafka-connect-1 bash -c
"confluent-hub install --no-prompt
confluentinc/kafka-connect-elasticsearch:latest"
然後,接下來我們需要重新啟動Kafka Connect服務,以便可以開始使用新的Sink:
docker restart kafka-elastic-search-kafka-connect-1
最後,為了檢查一切是否按預期工作,我們可以呼叫 Kafka Connect API 來檢查可用的 Sinks:
curl -s http://localhost:8083/connector-plugins | jq .
我們應該在回應中看到io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
。
4. 你好,世界
現在,讓我們嘗試發送第一條訊息,從 Kafka 流向 ElasticSearch。為了做到這一點,我們首先需要建立主題,如下所示:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}"
) bash -c
"kafka-topics --create --topic logs
--bootstrap-server kafka:9092
--partitions 1
--replication-factor 1"
這將在 Kafka Broker 中創建我們的 Kafka 主題。接下來,讓我們建立一個名為test-connector.json
的檔案:
{
"name": "elasticsearch-sink-connector-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"connection.url": "http://elasticsearch:9200",
"tasks.max": "1",
"topics": "logs",
"key.ignore": "true",
"schema.ignore": "true"
}
}
該檔案包含我們的 Kafka Connector Sink 及其配置。我們稍後會更好地理解這些配置,但我們需要知道這是透過 API 建立連接器所需的有效負載。該文件的前四個屬性與本文中所有其他範例的屬性相同,因此為簡單起見,我們將省略它們。
現在讓我們建立 Kafka 連接器:
curl -X POST -H 'Content-Type: application/json' --data @test-connector.json http://localhost:8083/connectors
透過這樣做,我們的連接器就創建好了,它應該正在運行以確認我們可以使用 JSON 檔案中定義的連接器名稱並使用另一個 Kafka Connect API 進行查詢:
curl http://localhost:8083/connectors/elasticsearch-sink-connector-test/status
這應該可以確認我們的連接器已啟動並正在運行。
現在我們知道我們的連接器正在運行,讓我們發送第一個訊息。為了模擬 Kafka 生產者,我們可以運行以下行:
docker exec -it $(docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}")
kafka-console-producer --broker-list kafka:9092 --topic logs
上面的命令創建了一個互動式提示,讓我們可以將訊息發送到我們的logs
Kafka 主題。我們可以創建任何有效的 JSON 並按回車鍵發送訊息:
{"message": "Hello word", "timestamp": "2025-02-05T12:00:00Z"}
{"message": "Test Kafka Connector", "timestamp": "2025-02-05T13:00:00Z"}
為了驗證資料是否到達 ElasticSearch,我們可以開啟另一個終端並呼叫:
curl -X GET "http://localhost:9200/logs/_search?pretty"
我們可以觀察到,資料自動從我們的 Kafka 主題流向 ElasticSearch;將我們的主題綁定到我們的 ElasticSearch 索引是唯一需要的步驟。然而,該連接器提供的功能遠不止這些。
5. Kafka Connect Elasticsearch Sink 的高階場景
如前所述,Kafka 連接器是功能強大的工具,它提供了整合資料儲存和 Kafka 的強大機制。 Kafka Connect 提供了廣泛的設定選項,讓使用者可以定義他們的資料管道來滿足他們的用例。
處理分散式訊息或資料流可能是一個非常複雜的問題。此工具旨在簡化它。讓我們考慮一些常見的場景。
5.1. Kafka Avro 訊息傳送到 Elasticsearch
許多專案都使用 Avro 格式,因為它在序列化和模式演變方面效率很高。當使用 Avro 時,Elasticsearch 應該會根據模式自動偵測欄位類型。讓我們研究如何在與 Elasticsearch 整合時利用 Avro 模式。
首先,我們需要一個 Avro 模式登錄:
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:9092"
SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
第一步是將此新服務新增至我們的 Docker Compose 檔案並執行:
docker compose up -d
一旦我們有了模式註冊表,我們就可以建立一個新主題來保存我們的 Avro 訊息:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}"
) bash -c
"kafka-topics --create
--topic avro_logs
--bootstrap-server kafka:9092
--partitions 1
--replication-factor 1"
下一步是建立一個名為avro-sink-config.json
的新連接器設定檔:
{
"name": "avro-elasticsearch-sink",
"config": {
...
"key.ignore": "true",
"schema.ignore": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
讓我們花一點時間來了解這個文件:
-
schema.ignore
:這告訴連接器使用訊息模式來建立 ElasticSearch 文件。在這種情況下,架構註冊表定義將用於定義索引映射 -
value.converter
:告訴連接器訊息遵循 Avro 格式(io.confluent.connect.avro.AvroConverter
) -
value.converter.schema.registry.url
:指定架構登錄位置
了解了配置後,我們可以繼續建立連接器:
curl -X POST -H "Content-Type: application/json" --data @avro-sink-config.json http://localhost:8083/connectors
我們可以透過檢查狀態來確認連接器是否正在運行,就像之前所做的那樣。確認後,我們可以繼續建立 Avro 訊息:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-schema-registry-1" --format "{{.ID}}"
) kafka-avro-console-producer
--broker-list kafka:9092
--topic avro_logs
--property value.schema='{
"type": "record",
"name": "LogEntry",
"fields": [
{"name": "message", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}'
提示準備好後,讓我們發送一條測試訊息,例如:
{"message": "My Avro message", "timestamp": 1700000000}
最後,讓我們看看 ElasticSearch 並查看我們的訊息和映射:
curl -X GET "http://localhost:9200/avro_logs/_search?pretty"
進而:
curl -X GET "http://localhost:9200/avro_logs/_mapping"
我們可以看到,映射是使用模式建立的。
在進行下一個測試之前,我們先進行清理:
curl -X DELETE "http://localhost:9200/avro_logs"
進而:
curl -X DELETE "http://localhost:8083/connectors/avro-elasticsearch-sink"
這將刪除 Kafka 連接器和 ElasticSearch 索引。
5.2.時間戳轉換
讓我們使用一個新的連接器設定檔timestamp-transform-sink.json
來自動將紀元時間戳記轉換為ISO-8601格式。配置如下:
{
"name": "timestamp-transform-sink",
"config": {
...
"topics": "epoch_logs",
"key.ignore": "true",
"schema.ignore": "true",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "timestamp",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssZ"
}
}
讓我們看看以下亮點:
-
transforms
:定義轉換名稱,以便套用於我們的資料處理管道 -
TimestampConverter
:定義轉換,從訊息中提取一個欄位並使用特定格式進行轉換
然後,我們建立連接器:
curl -X POST -H "Content-Type: application/json" --data @timestamp-transform-sink.json http://localhost:8083/connectors
我們來測試一下:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}"
) kafka-console-producer
--broker-list kafka:9092
--topic epoch_logs
發送訊息:
{"message": "Timestamp transformation", "timestamp": 1700000000000}
為了確認這一點,讓我們運行:
curl -X GET "http://localhost:9200/epoch_logs/_search?pretty"
進而:
curl -X GET "http://localhost:9200/epoch_logs/_mapping"
在這裡,我們看到了時間戳記是如何轉換的,並且 ElasticSearch 正確地將欄位對應到資料類型。
5.3.忽略並記錄錯誤
預設情況下,連接器有一個名為**errors.tolerance, which is**
定義為**none.**
這意味著當發生錯誤時連接器將停止處理。然而,有時,在即時處理時,這可能不是一個好主意。因此,現在讓我們看看如何讓連接器忽略錯誤並繼續前進。
再次,我們首先創建一個主題:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}"
) bash -c
"kafka-topics --create
--topic test-error-handling
--bootstrap-server kafka:9092
--partitions 1
--replication-factor 1"
然後,我們將配置連接器error-handling-sink-config.json
:
{
"name": "error-handling-elasticsearch-sink",
"config": {
...
"topics": "test-error-handling",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn",
"behavior.on.error": "LOG",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
主要屬性:
-
behavior.on.malformed.documents
:記錄無效文件而不是停止連接器 -
errors.tolerance
:允許 Kafka Connect 在出現錯誤的情況下繼續處理有效訊息 -
errors.log.enable
:將錯誤記錄到 Kafka Connect 日誌中 -
errors.log.include.messages
:在日誌中包含實際的問題訊息
現在我們註冊連接器:
curl -X POST -H "Content-Type: application/json" --data @error-handling-sink-config.json http://localhost:8083/connectors
然後我們打開一個控制台來測試一下:
docker exec -it $(
docker ps --filter "name=kafka-elastic-search-kafka-1" --format "{{.ID}}"
) kafka-console-producer
--broker-list kafka:9092
--topic test-error-handling
接下來,我們發送以下訊息:
{"message": "Ok", "timestamp": "2025-02-08T12:00:00Z"}
{"message": "NOK", "timestamp": "invalid_timestamp"}
{"message": "Ok Again", "timestamp": "2025-02-08T13:00:00Z"}
最後,我們來檢查一下 ElasticSearch:
curl -X GET "http://localhost:9200/test-error-handling/_search?pretty"
我們可以確認只有第一條和最後一則訊息被編入索引。現在,讓我們檢查連接器日誌:
docker logs kafka-elastic-search-kafka-connect-1 | grep "ERROR"
日誌顯示處理主題偏移量 1 時發生錯誤。但是,連接器狀態正在運行,這正是我們希望發生的。
5.4.在 Elasticsearch 中微調批量處理和刷新
當談到高效處理大規模資料流時,許多變數都會發揮作用。因此,我們這次不會測試特定場景。相反,讓我們花點時間看看 ElastickSearch Connector Sink 為我們提供的不同參數,以便微調我們的用例。
這些配置的組合將直接影響我們的效率和可擴展性。因此,必須正確設計一些容量計劃並針對不同的配置組合執行該計劃,以了解它們如何影響我們的工作負載。現在讓我們檢查與提取和資料刷新相關的最相關的配置:
參數名稱 | 預設值 |
batch.size | 2000(可從 1 到 1000000) |
bulk.size.bytes | 5兆位元組(可達GB) |
max.in.flight.requests | 5(可從 1 到 1000) |
max.buffered.records | 20000(範圍從 1 到 2147483647) |
linger.ms | 1(範圍從 0 到 604800000) |
flush.timeout.ms | 3 分鐘(最長可達數小時) |
flush.synchronously | 錯誤的 |
max.retries | 5 |
retry.backoff.ms | 100 |
connection.compression | 錯誤的 |
write.method | INSERT(也可以是 UPSERT) |
read.timeout.ms | 3 分鐘(最長可達數小時) |
有關詳盡列表,我們可以查看官方文件頁面。
6. 結論
按照本指南,我們已成功使用 Kafka Connect Sink 建立了從 Kafka 到 Elasticsearch 的近乎即時的資料管道。附加的測試場景確保了處理各種現實世界資料轉換和提取策略的靈活性。我們也了解了該連接器為我們提供的所有控制和機制,以便微調我們的流管道。
與往常一樣,本文使用的所有程式碼範例均可在 GitHub 上找到。