一、开发效率高
Spring Batch提高了开发者的生产力,使得批处理应用程序的开发变得容易和高效。它提供了一组用于定义简单批处理作业的元数据,这些元数据可以轻松地扩展,以支持更复杂的流程。在Spring Batch中,开发者只需要定义作业的输入(Reader)、处理(Processor)和输出(Writer)节点即可,而不需要像原始的Java批处理API那样编写很多模板和模式。另外,Spring Batch 还提供了大量的工具,如JobLauncher和JobOperator ,使得作业的启动和管理变得十分简单。
以下是一个简单的示例,展示如何使用Spring Batch来实现在XML或者数据库中读取人员信息,处理并输出到XML和文本文件当中。代码中包含两个不同的Job,分别读取XML和数据库中的数据并输出到不同的地方。
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Autowired
public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader xmlPersonReader() {
StaxEventItemReader reader = new StaxEventItemReader();
reader.setResource(new ClassPathResource("persons.xml"));
reader.setFragmentRootElementName("person");
reader.setUnmarshaller(personUnmarshaller());
return reader;
}
@Bean
public Unmarshaller personUnmarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
return marshaller;
}
@Bean
public ItemReader dbPersonReader(JdbcTemplate jdbcTemplate) {
JdbcCursorItemReader reader = new JdbcCursorItemReader();
reader.setDataSource(jdbcTemplate.getDataSource());
reader.setSql("select id, first_name, last_name, email, age from person");
reader.setRowMapper((resultSet, i) -> {
Person person = new Person();
person.setId(resultSet.getLong("id"));
person.setFirstName(resultSet.getString("first_name"));
person.setLastName(resultSet.getString("last_name"));
person.setEmail(resultSet.getString("email"));
person.setAge(resultSet.getInt("age"));
return person;
});
return reader;
}
@Bean
public ItemWriter xmlPersonWriter() {
StaxEventItemWriter writer = new StaxEventItemWriter();
writer.setResource(new FileSystemResource("persons.xml"));
writer.setRootTagName("persons");
writer.setMarshaller(personMarshaller());
writer.setOverwriteOutput(true);
return writer;
}
@Bean
public Marshaller personMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
return marshaller;
}
@Bean
public ItemWriter txtPersonWriter() throws Exception {
FlatFileItemWriter writer = new FlatFileItemWriter();
writer.setResource(new FileSystemResource("persons.txt"));
writer.setLineAggregator(lineAggregator());
writer.setAppendAllowed(true);
writer.setHeaderCallback(writer1 -> {
writer1.write("Id\tFirst Name\tLast Name\tEmail\tAge\n");
});
writer.afterPropertiesSet();
return writer;
}
@Bean
public LineAggregator lineAggregator() {
DelimitedLineAggregator aggregator = new DelimitedLineAggregator();
aggregator.setDelimiter("\t");
aggregator.setFieldExtractor(fieldExtractor());
return aggregator;
}
@Bean
public FieldExtractor fieldExtractor() {
BeanWrapperFieldExtractor extractor = new BeanWrapperFieldExtractor();
extractor.setNames(new String[] {"id", "firstName", "lastName", "email", "age"});
return extractor;
}
@Bean
public Step xmlPersonStep(ItemReader xmlPersonReader, ItemWriter txtPersonWriter, ItemWriter xmlPersonWriter) {
return stepBuilderFactory.get("xmlPersonReadingStep")
.chunk(10)
.reader(xmlPersonReader)
.processor(personUpperProcessor())
.writer(txtPersonWriter)
.writer(xmlPersonWriter)
.build();
}
@Bean
public Step dbPersonStep(ItemReader dbPersonReader, ItemWriter txtPersonWriter) {
return stepBuilderFactory.get("dbPersonReadingStep")
.chunk(10)
.reader(dbPersonReader)
.processor(personUpperProcessor())
.writer(txtPersonWriter)
.build();
}
@Bean
public Job job(Step dbPersonStep, Step xmlPersonStep) {
return jobBuilderFactory.get("personJob")
.incrementer(new RunIdIncrementer())
.start(xmlPersonStep)
.next(dbPersonStep)
.build();
}
@Bean
public ItemProcessor personUpperProcessor() {
return person -> {
person.setFirstName(person.getFirstName().toUpperCase());
person.setLastName(person.getLastName().toUpperCase());
person.setEmail(person.getEmail().toUpperCase());
return person;
};
}
}
二、高性能处理大规模数据
Spring Batch 能够高效处理大规模的数据,其内部实现了优化批处理的算法,可以大大提升批处理的处理速度、效率和吞吐量。在处理大规模数据的过程中,Spring Batch 的每个步骤都是按照一定的要求进行分割的,以便能够并行处理数据,这样才能最大化地利用多核处理器的性能优势。
以下代码展示了如何在Spring Batch中使用多线程并行处理大文件:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Autowired
public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public FlatFileItemReader reader() {
FlatFileItemReader reader = new FlatFileItemReader();
reader.setResource(new ClassPathResource("data.txt"));
reader.setLineMapper(lineMapper());
return reader;
}
@Bean
public LineMapper lineMapper() {
DefaultLineMapper lineMapper = new DefaultLineMapper();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(",");
tokenizer.setNames(new String[] {"id", "filename", "content"});
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new FileContentFieldSetMapper());
return lineMapper;
}
@Bean
public ItemProcessor processor() {
return fileContent -> {
fileContent.setContent(fileContent.getContent().toUpperCase());
return fileContent;
};
}
@Bean
public ItemWriter writer() {
FlatFileItemWriter writer = new FlatFileItemWriter();
writer.setResource(new FileSystemResource("processed-data.txt"));
writer.setLineAggregator(new DelimitedLineAggregator() {
{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor() {
{
setNames(new String[] {"id", "filename", "content"});
}
});
}
});
return writer;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.chunk(1000)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(20);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
三、易于维护和测试
Spring Batch提供了许多易于维护和测试的功能。Spring Batch的作业和步骤都是单独的Java类,这使得它们易于定义、修改和测试。此外,Spring Batch 还提供了一系列的工具和类,如 ItemReader、ItemWriter、ItemProcessor 等,方便开发者进行单元测试和集成测试。
以下代码展示如何在Spring Batch的项处理器中使用JUnit进行单元测试:
public class PersonItemProcessorTest {
private ItemProcessor processor = new PersonItemProcessor();
@Test
public void testProcessor() throws Exception {
Person person = new Person();
person.setFirstName("Alice");
person.setLastName("Smith");
person.setEmail("alice.smith@example.com");
person.setAge(30);
PersonDto personDto = processor.process(person);
assertNotNull(personDto);
assertEquals("ALICE SMITH", personDto.getName());
assertEquals("alice.smith@example.com", personDto.getEmail());
}
}
四、缺点
Spring Batch的主要缺点是其学习曲线较长,需要一定的时间去学习和了解其核心概念及其用法。另外,一些高级功能,如调度和监控等,需要进行一定的配置和设置,需要开发者有一定的经验才能轻松理解和使用。
原创文章,作者:OQANF,如若转载,请注明出处:https://www.506064.com/n/333138.html
微信扫一扫
支付宝扫一扫