使用 Spring Boot 在 Elasticsearch 中匯入 CSV
一、簡介
在本教程中,我們將學習如何使用 Spring Boot 將資料從 CSV 檔案匯入 Elasticsearch。當我們需要從遺留系統或外部來源遷移資料或準備測試資料集時,從 CSV 檔案匯入資料是常見的用例。
2. 使用 Docker 設定 Elasticsearch
要使用 Elasticsearch,我們將使用 Docker 在本地進行設定。請依照下列步驟啟動 Elasticsearch 容器:
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0
接下來,我們使用以下命令運行容器:
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0
讓我們使用以下資料建立一個範例 Excel 檔案「 products.csv
」:
id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...
3. 使用手動for
循環處理 CSV 數據
第一種方法涉及使用手動for
循環讀取 CSV 檔案中的記錄並將其索引到 Elasticsearch 中。為了實現此方法,我們將使用Apache Commons CSV庫來解析 CSV 文件,並使用 Elasticsearch Rest High-Level Client來與 Elasticsearch 搜尋引擎整合。
讓我們先將所需的依賴項新增到pom.xml
檔案中:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.11</version>
</dependency>
新增依賴項後,我們需要設定 Elasticsearch 配置。讓我們建立一個配置類別來設定RestHighLevelClient
:
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
return RestClients.create(ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build()).rest();
}
}
接下來,我們建立一個Product
類別來表示 CSV 資料:
@Document(indexName = "products")
public class Product {
@Id
private String id;
private String name;
private String category;
private double price;
private int stock;
// Getters and setters
}
之後,我們將在 Spring Boot 應用程式中建立一個服務來處理 CSV 匯入過程。在服務中,我們使用for
迴圈來迭代 CSV 檔案中的每筆記錄:
@Autowired
private RestHighLevelClient restHighLevelClient;
public void importCSV(File file) {
try (Reader reader = new FileReader(file)) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withHeader("id", "name", "category", "price", "stock")
.withFirstRecordAsHeader()
.parse(reader);
for (CSVRecord record : records) {
IndexRequest request = new IndexRequest("products")
.id(record.get("id"))
.source(Map.of(
"name", record.get("name"),
"category", record.get("category"),
"price", Double.parseDouble(record.get("price")),
"stock", Integer.parseInt(record.get("stock"))
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
} catch (Exception e) {
// handle exception
}
}
對於每筆記錄,我們建構一個IndexRequest
物件來準備資料以便在 Elasticsearch 中建立索引。然後使用RestHighLevelClient
對資料進行索引,這是與 Elasticsearch 互動的主要客戶端程式庫。
讓我們將 CSV 檔案中的資料匯入到 Elasticsearch 索引中:
File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);
接下來,讓我們查詢第一個索引並根據預期值驗證其內容:
IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
"name", "Microwave",
"category", "Appliances",
"price", 705.77,
"stock", 136
), firstRequest.sourceAsMap());
這種方法很簡單,讓我們可以完全控制整個過程。然而,它更適合較小的資料集,因為對於大檔案來說它可能效率低且耗時。
4. 使用 Spring Batch 進行可擴充的資料導入
Spring Batch 是一個強大的 Java 批次框架。它非常適合透過分塊處理資料來處理大規模資料導入。
要使用 Spring Batch,我們需要將Spring Batch 依賴項新增到pom.xml
檔案中:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.4.1</version>
</dependency>
4.1.定義 Spring 設定檔
接下來,讓我們建立一個設定類別來定義批次作業。在此配置中,我們使用@EnableBatchProcessing
註解來啟動 Spring Batch 功能,該功能可讓我們建立和管理批次作業。
我們設定一個FlatFileItemReader
來讀取 CSV 文件,並設定一個ItemWriter
將資料寫入 Elasticsearch。我們也在 Spring 設定檔中建立並配置一個RestHighLevelClient
bean:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
// ...
@Autowired private RestHighLevelClient restHighLevelClient
}
4.2.定義一個讀者
要從 CSV 檔案讀取數據,我們建立一個reader()
方法並定義一個FlatFileItemReader
。我們將使用FlatFileItemReaderBuilder
透過各種設定來配置閱讀器:
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new FileSystemResource("products.csv"))
.delimited()
.names("id", "name", "category", "price", "stock")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Product.class);
}})
.build();
}
我們使用name()
方法為讀取器指派一個名稱,這有助於在批次作業中識別它。此外, resource()
方法透過使用FileSystemResource
指定CSV檔案「 products.csv
」的位置。該檔案需要分隔(逗號分隔),這是透過delimited()
方法指定的。
names()
方法列出 CSV 檔案中的列標題,並將它們對應到Product
類別的欄位。最後, fieldSetMapper()
方法使用BeanWrapperFieldSetMapper
將 CSV 檔案的每一行對應到Product
物件。
4.3.定義一個作家
接下來,我們建立一個writer()
方法來將處理後的資料寫入 Elasticsearch。此方法定義一個接收Product
清單的ItemWriter
。它使用RestHighLevelClient
與 Elasticsearch 互動:
@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
return products -> {
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(Map.of(
"name", product.getName(),
"category", product.getCategory(),
"price", product.getPrice(),
"stock", product.getStock()
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
};
}
對於清單中的每個產品,我們建立一個IndexRequest
來指定 Elasticsearch 索引和文件結構。 id()
方法使用Product
物件的ID
為每個文檔分配一個唯一的 ID。
source()
方法會將Product
的欄位(例如name
、 category
、 price
和stock
)對應為 Elasticsearch 可以儲存的鍵值格式。配置請求後,我們使用client.index()
方法將Product
記錄傳送到 Elasticsearch,確保product
被索引以供搜尋和檢索。
4.4.定義 Spring Batch 作業
最後,讓我們建立importJob()
方法並使用 Spring Batch 的JobBuilder
和StepBuilder
來設定作業及其步驟:
@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager,
RestHighLevelClient restHighLevelClient) {
return new JobBuilder("importJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(reader())
.writer(writer(restHighLevelClient))
.build())
.build();
}
在本例中,我們使用JobBuilder
來設定作業。它將作業名稱“ importJob
”和JobRepository
作為參數。我們也配置了一個名為「 step1
」的步驟,並指定該作業一次處理10
記錄。 transactionManager
確保區塊處理過程中資料的一致性。
reader()
和writer()
方法整合到處理從 CSV 到 Elasticsearch 的資料流的步驟中。接下來,我們使用start(
) 方法將作業與步驟連結。此連線可確保該步驟作為作業的一部分執行。完成此配置後,我們可以使用 Spring 的JobLauncher
執行作業。
4.5.執行批次作業
讓我們來看看使用JobLauncher
執行 Spring Batch 作業的程式碼。我們將建立一個CommandLineRunner
bean 以在應用程式啟動時執行作業:
@Configuration
public class JobRunnerConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
JobExecution execution = jobLauncher.run(importJob, new JobParameters());
} catch (Exception e) {
// handle exception
}
};
}
}
成功運行作業後,我們可以透過使用curl
發出請求來測試結果:
curl -X GET "http://localhost:9200/products/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"match_all": {}
}
}'
讓我們看看預期的結果:
{
...
"hits": {
"total": {
"value": 25,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Microwave",
"category": "Appliances",
"price": 705.77,
"stock": 136
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "2",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Vacuum Cleaner",
"category": "Appliances",
"price": 1397.23,
"stock": 92
}
}
...
]
}
}
此方法的設定比以前的方法更複雜,但為導入資料提供了可擴展性和靈活性。
5.使用Logstash導入CSV數據
Logstash 是 Elastic 堆疊的一部分,專為資料處理和攝取而設計。
我們可以使用Docker來快速設定Logstash。首先,讓我們拉取並運行 Logstash 鏡像:
docker pull docker.elastic.co/logstash/logstash:8.17.0
拉取鏡像後,我們為 Logstash 建立一個設定檔「 csv-to-es.conf
」。該檔案定義了 Logstash 如何讀取 CSV 檔案並將資料傳送到 Elasticsearch:
input {
file {
path => "/path/to/your/products.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id", "name", "category", "price", "stock"]
}
mutate {
convert => { "price" => "float" }
convert => { "stock" => "integer" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "products"
}
stdout {
codec => json_lines
}
}
在此文件中,我們定義資料管道的輸入、過濾器和輸出階段。輸入階段指定要讀取的 CSV 文件,而過濾階段則處理和轉換資料。最後,輸出階段將處理後的資料傳送到Elasticsearch。
設定設定檔後,我們需要呼叫docker run
指令來執行 Logstash 管道:
docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/products.csv:/usr/share/logstash/products.csv \
docker.elastic.co/logstash/logstash:8.17.0
此命令將我們的配置和 CSV 檔案掛載到 Logstash 容器,並運行資料管道以將資料匯入 Elasticsearch。成功運行命令後,我們可以再次執行curl
查詢來驗證結果。
Logstash 無需自訂程式碼即可有效地將 CSV 資料匯入 Elasticsearch,使其成為處理大型資料集和設定自動化資料管道的熱門選擇。
六、總結
現在我們已經探索了將 CSV 檔案中的資料匯入 Elasticsearch 的三種方法,讓我們比較一下它們的優缺點:
方法 | 優點 | 缺點 |
---|---|---|
手動 For 循環 | 易於實施;完全控制 | 對於大文件效率不高 |
春季批次 | 可針對大型資料集進行擴展 | 適合初學者的複雜設置 |
日誌儲存 | 無需編碼;高效能 | 需要安裝 Logstash |
七、結論
在本文中,我們介紹如何使用三種方法將 CSV 資料匯入 Elasticsearch:手動 for 迴圈、Spring Batch 和 Logstash。每種方法都有其優點並且適合不同的用例。
與往常一樣,本文的完整實作可以在 GitHub 上找到。