Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。
Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。
Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如 Quartz、Tivoli、Control-M、Cron 等)可以使用。
1.现在实现Spring Batch方法,首先是配置:
@Configuration @EnableBatchProcessing public class SpringBatchConfiguration { @Resource private DataSource dataSource; @Resource private PlatformTransactionManager transactionManager; /** * 任务仓库 * @return */ @Bean public JobRepository jobRepository() throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setDatabaseType("mysql"); return jobRepositoryFactoryBean.getObject(); } /** * 任务加载器 * @return */ @Bean public SimpleJobLauncher jobLauncher() throws Exception{ SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository(this.jobRepository()); return launcher; } @Bean public TestJobListener testJobListener(){ return new TestJobListener(); } }
2.TestJobListener监听器
public class TestJobListener implements JobExecutionListener { private final Logger logger = Logger.getLogger(this.getClass().getName()); private long time = 0L; @Override public void beforeJob(JobExecution jobExecution) { this.time = System.currentTimeMillis(); logger.info(">>job start"); } @Override public void afterJob(JobExecution jobExecution) { logger.info(">>job end("+(System.currentTimeMillis()-time)+")"); } }
3.读操作代码
public class TestItemReader2 extends FlatFileItemReader<BikeKey> { public TestItemReader2(){ } public void setData(String path,LineMapper<BikeKey> lineMapper){ this.setResource(new FileSystemResource(path)); this.setLineMapper(lineMapper); } } //LineMapper代码@Component public class TestItemLineMapper implements LineMapper<BikeKey> { @Override public BikeKey mapLine(String s, int i) throws Exception { System.out.println("mapLine..."+s+" i:"+i); String[] args = s.split(","); // 创建DeviceCommand对象 BikeKey bikeKey = new BikeKey(); bikeKey.setId(null); bikeKey.setStatus(0); bikeKey.setKeySn(args[1]); return bikeKey; } }
4.写操作代码
@Component public class TestItemWriter implements ItemWriter<BikeKey> { @Resource private IBikeKeyService bikeKeyService; @Override public void write(List<? extends BikeKey> list) throws Exception { for (int i=0;i<list.size();i++){ System.out.println("write..."+list.size()+" i:"+i); bikeKeyService.insert(list.get(i)); } } }
5.处理过程代码
@Component public class TestItemProcessor implements ItemProcessor<BikeKey,BikeKey> { @Override public BikeKey process(BikeKey bikeKey) throws Exception { System.out.println("process..."); bikeKey.setKeyCreateTime((int) (DateUtil.getTime()/1000)); return bikeKey; } }
6.工作统一调用代码
@Component public class TestDoImport { @Resource private JobLauncher jobLauncher; @Resource private JobBuilderFactory jobBuilderFactory; @Resource private StepBuilderFactory stepBuilderFactory; @Resource private TestJobListener jobListener; @Resource private TestItemLineMapper lineMapper; /** * 读操作 */ private TestItemReader2 reader; /** * 写操作 */ @Resource private TestItemWriter writer; /** * 处理过程 */ @Resource private TestItemProcessor processor; public TestDoImport(){ } public void doImport(){ /** * 批量任务的参数 */ JobParameters jobParameters = new JobParametersBuilder() .addLong("TIME",System.currentTimeMillis()) .toJobParameters(); try { /** * 设置数据路径 */ reader = new TestItemReader2(); reader.setData("d:/bike_key.csv",lineMapper); /** * 执行任务 */ jobLauncher.run(this.getJob(jobBuilderFactory,this.getStep(stepBuilderFactory,reader,writer,processor)),jobParameters); } catch (JobExecutionAlreadyRunningException e) { e.printStackTrace(); } catch (JobRestartException e) { e.printStackTrace(); } catch (JobInstanceAlreadyCompleteException e) { e.printStackTrace(); } catch (JobParametersInvalidException e) { e.printStackTrace(); } } /** * 获取一个工作实例 * @param jobs * @param step * @return */ private Job getJob(JobBuilderFactory jobs, Step step){ return jobs .get("importJob") .incrementer(new RunIdIncrementer()) .flow(step) .end() .listener(jobListener)//监听整个过程 .build(); } /** * 获取一个步骤实例 * @param stepBuilderFactory * @param reader 读 * @param writer 写 * @param processor 过程 * @return */ private Step getStep(StepBuilderFactory stepBuilderFactory, ItemReader<BikeKey> reader, ItemWriter<BikeKey> writer, ItemProcessor<BikeKey,BikeKey> processor){ return stepBuilderFactory .get("step1") .<BikeKey,BikeKey> chunk(1) .reader(reader) .processor(processor) .writer(writer) .build(); } }
运行日志:
23-May-2018 13:42:03.817 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.beforeJob >>job start
mapLine…,a0011 i:1
process…
write…1 i:0
mapLine…,a0022 i:2
process…
write…1 i:0
mapLine…,a0033 i:3
process…
write…1 i:0
mapLine…,a0044 i:4
process…
write…1 i:0
mapLine…,a0055 i:5
process…
write…1 i:0
mapLine…,a0016 i:6
process…
write…1 i:0
mapLine…,a0027 i:7
process…
write…1 i:0
mapLine…,a0038 i:8
process…
write…1 i:0
mapLine…,a0049 i:9
process…
write…1 i:0
mapLine…,a00510 i:10
process…
write…1 i:0
mapLine…,a00111 i:11
process…
write…1 i:0
mapLine…,a00212 i:12
process…
write…1 i:0
mapLine…,a00313 i:13
process…
write…1 i:0
mapLine…,a00414 i:14
process…
write…1 i:0
mapLine…,a00515 i:15
process…
write…1 i:0
mapLine…,a00116 i:16
process…
write…1 i:0
mapLine…,a00217 i:17
process…
write…1 i:0
mapLine…,a00318 i:18
process…
write…1 i:0
mapLine…,a00419 i:19
process…
write…1 i:0
mapLine…,a00520 i:20
process…
write…1 i:0
23-May-2018 13:42:05.252 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.afterJob >>job end(1435)