一、Batch是什么?
Spring Batch是一款成熟稳定的批处理框架。它利用Java实现了大型、复杂批处理作业处理机制。它允许开发人员构建批处理应用程序,从而更加容易地执行、过滤、排序和转换大量数据。Batch不仅适合大数据处理,同时也可以在调度及监控方面提供帮助。Spring Batch支持大多数的主流数据库,同时它还提供了强大的与spring生态系统集成的能力。
以下是简单的代码示例:
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job simpleJob() {
return jobBuilderFactory.get("simpleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
System.out.println("Hello, world!");
return RepeatStatus.FINISHED;
}).build();
}
}
二、Batch核心概念
1. Job
在Spring Batch中,A Job是任务的顶级容器。它由一组Steps组成,定义了一个完整的、可独立执行的工作流程。
2. Step
在Spring Batch中,Step代表Job执行过程中的单个步骤。Step包含了一组处理步骤和决策,每个步骤中可以有一个Tasklet或一个Chunk。
3. Tasklet
Tasklet是一个独立的单元,可以执行一个事务。通常情况下,Tasklet可以完成一些批处理初始化工作、数据清理、文件复制或发送邮件等独立任务。
4. Chunk
Chunk指的是一次批处理中每次读取的一组数据,如从数据库中读取一批数据,然后逐一进行处理操作,处理完了再进行下一次读取操作。处理完成后,事务提交或回滚。Chunk事务通常适用于处理大规模数据时,可以减少内存的使用。
5. ItemReader
ItemReader负责从数据源中读取数据,并且将数据传递给ItemProcessor进行处理。
6. ItemProcessor
ItemProcessor用来处理读取到的数据,可以根据需要进行数据转换或过滤等操作。
7. ItemWriter
ItemWriter用来将处理后的数据写入目标数据源中。
8. JobRepository
JobRepository是一个持久化的、线程安全的Repository,它用来存储Job的状态信息,以及每个Step的执行状态信息。
三、使用Spring Batch实现批处理作业
1. 实现步骤
使用Spring Batch实现批处理作业的步骤如下:
1) 创建Job
在创建Job之前,需要先定义JobRepository和JobBuilderFactory。然后通过JobBuilderFactory创建Job。
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private JobRepository jobRepository;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.repository(jobRepository)
.start(step())
.build();
}
2) 创建Step
在创建Step之前,需要先定义StepBuilderFactory。然后通过StepBuilderFactory创建Step。
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.chunk(5)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
3) 创建ItemReader
在创建ItemReader之前,需要先定义DataSource,并使用JdbcCursorItemReader从数据源中获取数据。
@Autowired
private DataSource dataSource;
@Bean
public ItemReader reader() {
JdbcCursorItemReader reader = new JdbcCursorItemReader();
reader.setDataSource(dataSource);
reader.setSql("select name from person");
reader.setRowMapper((rs, rowNum) -> rs.getString("name"));
return reader;
}
4) 创建ItemProcessor
在创建ItemProcessor之前,需要定义ItemProcessor并实现process()方法。
@Bean
public ItemProcessor processor() {
return name -> "Hello, " + name + "!";
}
5) 创建ItemWriter
在创建ItemWriter之前,需要定义写入目标数据源。
@Autowired
private SomeRepository someRepository;
@Bean
public ItemWriter writer() {
return items -> someRepository.saveAll(items);
}
6) 运行Job
在创建完Job之后,可以直接运行Job。
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public void run() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
2. 执行结果
执行上述代码后,将会输出以下结果:
Hello, Tom!
Hello, Jerry!
Hello, Mary!
Hello, John!
Hello, Bob!
四、总结
Spring Batch是一个成熟稳定的批处理框架,它由Job、Step、Tasklet、Chunk、ItemReader、ItemProcessor、ItemWriter和JobRepository等核心概念组成。使用Spring Batch可以帮助我们更加容易地执行、过滤、排序和转换大量数据,并且它还提供了强大的与spring生态系统集成的能力。
原创文章,作者:RADCA,如若转载,请注明出处:https://www.506064.com/n/334961.html
微信扫一扫
支付宝扫一扫