如何在 Spring Batch 中執行多個作業
一、簡介
Spring Batch是一個強大的框架,透過提供可重複使用的元件和可靠的基礎設施,使處理大量資料變得輕而易舉。在現實場景中,應用程式通常需要按照特定的執行順序同時執行多個作業,以優化效能並有效管理依賴關係。
在本教程中,我們將探索在 Spring Batch 中執行多個作業的各種方法。
2. 了解 Spring Batch 作業
在 Spring Batch 的脈絡中,作業是一系列步驟的容器,代表整個流程。每個作業都有一個唯一的標識符,並且可以由按順序或基於特定條件執行的多個步驟組成。我們可以使用 XML 或 Java 配置作業, JobLauncher
通常會啟動它們。
運行多個作業在以下場景中是有益的:
- 平行處理
- 資料遷移和 ETL 流程
- 報告生成等
有效管理多個作業對於實現最佳效能、可維護性和可擴展性至關重要。讓我們來探索一下在 Spring Batch 中實現這一目標的不同方法。
3. 配置
首先,讓我們配置我們的依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
<version>2.2.224</version>
</dependency>
我們新增了[spring-boot-starter-web](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web/3.3.2) ,
基本 Spring Boot 依賴項、用於批次的spring-boot-starter-batch
以及用於記憶體資料庫的h2
。
接下來,讓我們啟用批次並配置我們的資料來源:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public DataSource dataSource() {
return DataSourceBuilder.create()
.driverClassName("org.h2.Driver")
.url("jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
.username("sa")
.password("")
.build();
}
@Bean
public DatabasePopulator databasePopulator(DataSource dataSource) {
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
populator.setContinueOnError(false);
populator.execute(dataSource);
return populator;
}
}
現在,讓我們建立兩個不同的作業作為範例。每個作業將執行一個簡單的任務:
@Configuration
public class JobsConfig {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Bean
public Job jobOne(JobRepository jobRepository, Step stepOne) {
return new JobBuilder("jobOne", jobRepository).start(stepOne)
.build();
}
@Bean
public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("Hello");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
return new JobBuilder("jobTwo", jobRepository).start(stepTwo)
.build();
}
@Bean
public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
log.info("World");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
}
@EnableBatchProcessing
註解設定基本的 Spring Batch 元件,例如JobLauncher
、 JobRepository
和JobExplorer
。
我們將兩個獨立的作業jobOne
和jobTwo
定義為 Spring bean。每個作業都有自己獨特的配置和步驟,我們將在這些方法中定義它們。這些步驟是具有事務支援的簡單微線程,記錄訊息以確認每個步驟何時執行。
讓我們確認一下職位的定義:
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
assertNotNull(jobOne, "jobOne should be defined");
assertEquals("jobOne", jobOne.getName());
assertNotNull(jobTwo, "jobTwo should be defined");
assertEquals("jobTwo", jobTwo.getName());
}
4. 順序作業執行
如果我們的作業需要一個接一個地運行,特別是當它們依賴彼此的輸出時,順序執行是一種可行的方法。讓我們透過一個例子來看看它是如何運作的。
`@Component
public class SequentialJobsConfig {
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
public void runJobsSequentially() {
JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
.toJobParameters();
JobParameters jobParameters2 = new JobParametersBuilder().addString("ID", "Sequential 2")
.toJobParameters();
// Run jobs one after another
try {
jobLauncher.run(jobOne, jobParameters);
jobLauncher.run(jobTwo, jobParameters2);
} catch (Exception e) {
// handle exception
e.printStackTrace();
}
}
}`
我們定義了一個名為SequentialJobsConfig
的元件,並將先前建立的兩個作業加入到該類別中。然後,使用JobLauncher
執行作業。我們建構了jobParameters
透過使用addString()
方法新增ID
來確保每個作業實例都是唯一的。這種方法允許我們控制執行流程並在繼續下一個作業之前檢查每個作業的結果。
讓我們檢查作業是否成功運行:
@Autowired
private SequentialJobsConfig sequentialJobsConfig;
@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}
5. 並行作業執行
在某些情況下,我們的作業彼此不依賴,並行運行它們可以縮短執行時間。我們可以利用 Spring 的TaskExecutor
介面來實現這一點:
@Component
public class ParallelJobService {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
public void runJobsInParallel() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.execute(() -> {
try {
jobLauncher.run(jobTwo, new JobParametersBuilder().addString("ID", "Parallel 2")
.toJobParameters());
} catch (Exception e) {
e.printStackTrace();
}
});
taskExecutor.close();
}
}
在此配置中,我們使用 Spring 的SimpleAsyncTaskExecutor
透過JobLauncher
啟動作業。
然而,在使用並行方式時,我們需要考慮執行緒安全、資源爭用、事務管理等因素,以確保穩定且有效率的執行。
6. 使用作業調度
有時,我們不僅想執行多個作業,而是希望在特定時間或間隔內執行這些作業。這就是作業調度發揮作用的地方。這可以使用 Spring 的調度支援或外部調度程序輕鬆完成。
6.1.使用Spring的@Scheduling
@Scheduled
註解允許方法(作業)以給定的時間間隔重複執行。此方法需要使用@EnableScheduling
註解啟用調度。
讓我們建立一個帶有所需註解的ScheduledJobs
類別來設定我們的作業:
@Configuration
@EnableScheduling
public class ScheduledJobs {
private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(cron = "0 */1 * * * *") // Run every minute
public void runJob1() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing sheduled job 1");
jobLauncher.run(jobOne, jobParameters);
}
@Scheduled(fixedRate = 1000 * 60 * 3) // Run every 3 minutes
public void runJob2() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
log.info("Executing sheduled job 2");
jobLauncher.run(jobTwo, jobParameters);
}
}
在此範例中,我們使用了上一節中建立的作業類別。我們將jobOne
配置為每分鐘運行一次,而jobTwo
配置為每 3 分鐘運行一次。 @Scheduled
註解允許使用固定速率或 cron 表達式定義簡單到複雜的調度模式。
6.2.使用 Quartz 調度程序
Quartz 調度程式是一個功能強大的程式庫,用於在 Java 應用程式中排程任務。就像@Scheduling
一樣,Quartz 允許在特定時間間隔運行多個作業。為了能夠使用 Quartz,我們需要加入spring-boot-starter-quartz
依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>3.3.2</version>
</dependency>
接下來,我們建立兩個作業, QuartzJobOne
和QuartzJobTwo
:
@Component
public class QuartzJobOne implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job One is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job One: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}
@Component
public class QuartzJobTwo implements Job {
private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
log.info("Job Two is executing from quartz");
} catch (Exception e) {
log.error("Error executing Job Two: {}", e.getMessage(), e);
throw new JobExecutionException(e);
}
}
}
現在,讓我們為每個作業定義兩個 bean、 JobDetail
和一個Trigger
:
@Configuration
public class QuartzConfig {
@Autowired
private Job quartzJobOne;
@Autowired
private Job quartzJobTwo;
@Bean
public JobDetail job1Detail() {
return JobBuilder.newJob().ofType(quartzJobOne.getClass())
.withIdentity("quartzJobOne", "group1")
.storeDurably()
.build();
}
@Bean
public JobDetail job2Detail() {
return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
.withIdentity("quartzJobTwo", "group1")
.storeDurably()
.build();
}
@Bean
public Trigger job1Trigger(JobDetail job1Detail) {
return TriggerBuilder.newTrigger()
.forJob(job1Detail)
.withIdentity("quartzJobOneTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
.build();
}
@Bean
public Trigger job2Trigger(JobDetail job2Detail) {
return TriggerBuilder.newTrigger()
.forJob(job2Detail)
.withIdentity("quartzJobTwoTrigger", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/15 * * * * ?"))
.build();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setJobDetails(job1Detail(), job2Detail());
schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
return schedulerFactory;
}
}
我們使用JobBuilder
為 Quartz 作業建立了一個JobDetail
,分別指定作業類別及其識別。其次,我們建立了一個觸發器,並使用 cron 表達式定義了作業何時運行,該表達式分別安排作業每 10 秒和 15 秒運行一次。
我們會自動在schedulerFactoryBean
bean 中啟動我們的作業。運行quartz作業的方法有很多種,包括使用參數運行作業、使用日曆進行排程以及暫停和恢復作業。
Quartz具有高度的靈活性,支援複雜的調度場景。但是,它需要額外的設置,並且比使用 @Scheduling 更複雜。
7. 動態作業執行
我們已經介紹了使用 Spring Batch 執行多個作業的幾種方法,這些方法要求我們預先靜態配置和定義我們的作業。然而,在某些情況下,我們希望根據某些執行時間條件按需建立作業。在使用 Spring Batch 時,我們可以像往常一樣使用面向區塊或基於任務的方法來完成此任務。對於此範例,我們將使用基於區塊的方法。
在以區塊為導向的方法中,每個作業的資料從ItemReader
讀取,然後由ItemProcessor
處理。隨後讀取和處理的區塊傳遞給ItemWriter
。
讓我們建立一個DynamicJobService
類別並定義負責執行作業的方法:
`@Service
public class DynamicJobService {
private final JobRepository jobRepository;
private final JobLauncher jobLauncher;
private final PlatformTransactionManager transactionManager;
public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.jobLauncher = jobLauncher;
this.transactionManager = transactionManager;
}
public void createAndRunJob(Map<String, List
List
// Create chunk-oriented jobs
for (Map.Entry<String, List
if (entry.getValue() instanceof List) {
jobs.add(createJob(entry.getKey(), entry.getValue()));
}
}
// Run all jobs
for (Job job : jobs) {
JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
private Job createJob(String jobName, List
return new JobBuilder(jobName, jobRepository).start(createStep(data))
.build();
}
private Step createStep(List
return new StepBuilder("step", jobRepository).<String, String> chunk(10, transactionManager)
.reader(new ListItemReader<>(data))
.processor(item -> item.toUpperCase())
.writer(items -> items.forEach(System.out::println))
.build();
}
}`
在上面的範例中,我們建立了一個名為createAndRunJob
的方法,該方法根據jobsData
產生作業並啟動它們。以下是執行期間發生的情況:
reader()
方法一次從輸入清單中讀取一項。每個項目都會傳遞給processor()
,該處理器將項目的第一個字母轉換為大寫。然後將處理後的項目收集到一個區塊中,區塊大小定義為 10 writer()
隨後,編寫器將區塊中的所有項目列印到控制台,並重複此過程,直到處理完所有項目。
讓我們看看該服務的實際運作:
`@Autowired
private DynamicJobService dynamicJobService;
@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
Map<String, List
jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
jobsData.put("chunkJob2", Arrays.asList("data4", "data5", "data6"));
assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData), "Dynamic job creation and execution should run successfully");
}`
我們建立了兩個作業並將其傳遞給服務的createAndRunJob
方法,每個作業都有一個作業標識及其資料。
在現實世界的範例中,我們可能會運行更複雜的處理邏輯。如果內建實作不能滿足我們的特定要求,最好分別建立ItemReader
、 ItemProcessor
和ItemWriter
的自訂實作。
八、結論
在本文中,我們探索了一些使用 Spring Batch 執行多個作業的方法。透過了解本文中使用的基本範例,我們可以設計一個更有效率、可擴展且可維護的批次系統。
無論我們應該使用哪種方法,都應該取決於最適合我們特定需求的方法。
完整的實作可以在 GitHub 上找到。