Spring Batch 中的複合項目閱讀器
1. 簡介
在 Spring Batch 中, CompositeItemReader
是將多個ItemReader
實例組合成單一閱讀器的工具。當我們需要從多個來源或按照特定順序讀取資料時,這特別有用。例如,我們可能想要同時從資料庫和檔案中讀取記錄,或以特定順序處理來自兩個不同資料表的資料。
CompositeItemReader
簡化了批次作業中的多個讀取器的處理,確保了高效靈活的資料處理。在本教程中,我們將介紹 Spring Batch 中CompositeItemReader
的實現,並查看範例和測試案例來驗證其行為。
2. 了解CompositeItemReader
CompositeItemReader
工作原理是將讀取過程委託給ItemReader
實例清單。它按照定義的順序從每個讀取器讀取項目,確保按順序處理資料。
這在以下場景中尤其有用:
- 從多個資料庫或表中讀取
- 合併文件和資料庫中的數據
- 按特定順序處理來自不同來源的數據
此外, CompositeItemReader
是org.springframework.batch.item.support
套件的一部分,它是在 Spring Batch 5.2.0 中引入的。
3.實作CompositeItemReader
讓我們來看一個例子,其中我們從兩個不同的來源讀取資料:一個平面檔案和一個資料庫。目標是將來自兩個來源的產品資料合併為單一流以進行批次處理。有些產品位於平面文件中,而其他產品位於資料庫中,確保所有可用記錄一起處理。
3.1.建立Product
類別
在我們設定讀取器之前,我們需要一個代表正在處理的資料結構的Product
類別。此類封裝了有關產品的詳細信息,例如其 ID、名稱、庫存情況和價格。我們將在從 CSV 檔案和資料庫讀取時使用該模型,確保資料處理的一致性。
Product
類別充當我們的讀者和批次作業之間的資料傳輸物件 (DTO):
public class Product {
private Long productId;
private String productName;
private Integer stock;
private BigDecimal price;
public Product(Long productId, String productName, Integer stock, BigDecimal price) {
this.productId = productId;
this.productName = productName;
this.stock = stock;
this.price = price;
}
// Getters and Setters
}
Product
類別代表將由我們的批次作業處理的每筆記錄。現在我們的資料模型已經準備好了,我們將為 CSV 檔案和資料庫建立單獨的ItemReader
元件。
3.2. Product
資料平面檔案讀取器
第一個讀取器使用FlatFileItemReader
從 CSV 檔案中取得資料。我們將其配置為讀取分隔檔案( products.csv
)並將其欄位對應到Product
類別:
@Bean
public FlatFileItemReader<Product> fileReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("fileReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("productId", "productName", "stock", "price")
.linesToSkip(1)
.targetType(Product.class)
.build();
}
這裡, delimited()
方法可確保資料欄位使用分隔符號(預設為逗號)分隔。 names()
方法定義與Product
類別的屬性相符的列名,而targetType(Product.class)
方法則將欄位對應到類別屬性。
3.3. Product
資料資料庫讀取器
接下來,我們定義一個JdbcCursorItemReader
來從名為products
的資料庫表中檢索產品資料。此讀取器執行 SQL 查詢來取得產品詳細資訊並將其對應到我們的Product
類別。
下面是資料庫讀取器的實作:
@Bean
public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Product>()
.name("dbReader")
.dataSource(dataSource())
.sql("SELECT productid, productname, stock, price FROM products")
.rowMapper((rs, rowNum) -> new Product(
rs.getLong("productid"),
rs.getString("productname"),
rs.getInt("stock"),
rs.getBigDecimal("price")))
.build();
}
JdbcCursorItemReader
使用遊標從資料庫中一次讀取一行產品記錄,從而提高批次效率。 rowMapper()
函數將結果集中的每一列對應到Product
類別中的對應欄位。
4. 使用CompositeItemReader
組合閱讀器
現在我們的 CSV 和資料庫讀取器都已配置為讀取產品數據,我們可以使用CompositeItemReader
將它們整合:
@Bean
public CompositeItemReader<Product> compositeReader() {
return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}
透過配置我們的CompositeItemReader
,我們可以依序處理來自多個來源的資料。
最初, FlatFileItemReader
從 CSV 檔案讀取產品記錄,將每一行解析為一個Product
物件。一旦文件中的所有行都已處理, JdbcCursorItemReader
將接管並開始直接從資料庫取得產品資料。
5.配置批次作業
一旦我們為 CSV 檔案和資料庫定義了讀取器,下一步就是配置批次作業本身。在 Spring Batch 中,一個作業由多個步驟組成,每個步驟處理處理管道的特定部分:
@Bean
public Job productJob(JobRepository jobRepository, Step step) {
return new JobBuilder("productJob", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(ItemReader compositeReader, ItemWriter productWriter) {
return new StepBuilder("productStep", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(compositeReader)
.writer(productWriter)
.build();
}
在這種情況下,我們的工作包含一個步驟,即讀取產品資料、以10
區塊的形式處理它,並將其寫入所需的輸出。
productJob
bean 負責定義批次作業。它從productStep
開始執行,該 step 配置為處理產品資料處理。
透過這種設置,我們的批次作業首先使用CompositeItemReader
從兩個來源讀取產品數據,以 10 個區塊的形式進行處理,然後使用productWriter()
寫入轉換或過濾後的資料。這確保了批次流程順暢且有效率。
6. 執行批次作業
現在我們已經配置了讀取器和作業,下一步是執行批次作業並觀察CompositeItemReader
的行為。我們將在 Spring Boot 應用程式中執行該作業,以查看它如何處理來自 CSV 檔案和資料庫的資料。
為了以程式設計方式觸發批次作業,我們需要使用JobLauncher
。這使我們能夠啟動工作並監視其進度:
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
jobLauncher.run(productJob, new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters());
} catch (Exception e) {
// handle exception
}
};
}
在這個範例中,我們建立一個CommandLineRunner
bean 來在應用程式啟動時執行該作業。這將使用JobLauncher
productJob
。我們還新增了帶有時間戳記的唯一JobParameters
,以確保作業每次都唯一地執行。
7. 測試複合項目閱讀器
為了確保CompositeItemReader
能如預期運作,我們將測試CompositeItemReader
的功能,以確保它能從 CSV 和資料庫來源正確讀取產品。
7.1.準備測試數據
我們先準備一個包含產品資料的 CSV 文件,作為CompositeItemReader
的輸入:
productId,productName,stock,price
101,Apple,50,1.99
然後,我們也會向products
表中插入一筆記錄:
@BeforeEach
public void setUp() {
jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)",
102, "Banana", 30, 1.49);
}
7.2.測試閱讀順序
現在,我們將測試CompositeItemReader
以驗證它是否按照正確的順序處理產品,並從 CSV 和資料庫來源讀取。在這個測試中,我們從 CSV 檔案中讀取產品,然後從資料庫中讀取,並斷言其值符合我們的預期:
@Test
public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception {
StepExecution stepExecution = new StepExecution(
"testStep",
new JobExecution(1L, new JobParameters()),
1L);
ExecutionContext executionContext = stepExecution.getExecutionContext();
compositeReader.open(executionContext);
Product product1 = compositeReader.read();
assertNotNull(product1);
assertEquals(101, product1.getProductId());
assertEquals("Apple", product1.getProductName());
Product product2 = compositeReader.read();
assertNotNull(product2);
assertEquals(102, product2.getProductId());
assertEquals("Banana", product2.getProductName());
}
7.3.使用一個讀取器傳回空結果進行測試
在本節中,我們測試使用多個讀取器且其中一個讀取器傳回null
CompositeItemReader
的行為。這對於確保CompositeItemReader
跳過任何不返回資料的讀取器並繼續下一個讀取器直到找到有效資料非常重要:
@Test
public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception {
ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class);
when(emptyReader.read()).thenReturn(null);
ItemStreamReader<Product> validReader = mock(ItemStreamReader.class);
when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null);
CompositeItemReader<Product> compositeReader = new CompositeItemReader<>(
Arrays.asList(emptyReader, validReader));
Product product = compositeReader.read();
assertNotNull(product);
assertEquals(103, product.getProductId());
assertEquals("Cherry", product.getProductName());
}
8. 結論
在本文中,我們學習如何實作和測試CompositeItemReader
,它允許我們按照特定順序處理來自多個來源的資料。透過將讀取器連結在一起,我們可以按照特定順序處理來自檔案、資料庫或其他來源的資料。
與往常一樣,原始碼可在 GitHub 上取得。