C、任务决策
决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。
@Component public class MyDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { LocalDate now = LocalDate.now(); DayOfWeek dayOfWeek = now.getDayOfWeek(); if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) { return new FlowExecutionStatus("weekend"); } else { return new FlowExecutionStatus("workingDay"); } } } @Bean public Job deciderJob() { return jobBuilderFactory.get("deciderJob") .start(step1()) .next(myDecider) .from(myDecider).on("weekend").to(step2()) .from(myDecider).on("workingDay").to(step3()) .from(step3()).on("*").to(step4()) .end() .build(); } private Step step1() { return stepBuilderFactory.get("step1") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤一操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step2() { return stepBuilderFactory.get("step2") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤二操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step3() { return stepBuilderFactory.get("step3") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤三操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step4() { return stepBuilderFactory.get("step4") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤四操作。。。"); return RepeatStatus.FINISHED; }).build(); }
D、任务嵌套
任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。
@Component public class NestedJobDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private JobLauncher jobLauncher; @Autowired private JobRepository jobRepository; @Autowired private PlatformTransactionManager platformTransactionManager; // 父任务 @Bean public Job parentJob() { return jobBuilderFactory.get("parentJob") .start(childJobOneStep()) .next(childJobTwoStep()) .build(); } // 将任务转换为特殊的步骤 private Step childJobOneStep() { return new JobStepBuilder(new StepBuilder("childJobOneStep")) .job(childJobOne()) .launcher(jobLauncher) .repository(jobRepository) .transactionManager(platformTransactionManager) .build(); } // 将任务转换为特殊的步骤 private Step childJobTwoStep() { return new JobStepBuilder(new StepBuilder("childJobTwoStep")) .job(childJobTwo()) .launcher(jobLauncher) .repository(jobRepository) .transactionManager(platformTransactionManager) .build(); } // 子任务一 private Job childJobOne() { return jobBuilderFactory.get("childJobOne") .start( stepBuilderFactory.get("childJobOneStep") .tasklet((stepContribution, chunkContext) -> { System.out.println("子任务一执行步骤。。。"); return RepeatStatus.FINISHED; }).build() ).build(); } // 子任务二 private Job childJobTwo() { return jobBuilderFactory.get("childJobTwo") .start( stepBuilderFactory.get("childJobTwoStep") .tasklet((stepContribution, chunkContext) -> { System.out.println("子任务二执行步骤。。。"); return RepeatStatus.FINISHED; }).build() ).build(); } }
4.2、读取数据
定义 Model TestData
,下面同一
@Data public class TestData { private int id; private String field1; private String field2; private String field3; }
读取数据包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等,具体自己查资料。
文本数据读取 Demo
@Component public class FileItemReaderDemo { // 任务创建工厂 @Autowired private JobBuilderFactory jobBuilderFactory; // 步骤创建工厂 @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job fileItemReaderJob() { return jobBuilderFactory.get("fileItemReaderJob2") .start(step()) .build(); } private Step step() { return stepBuilderFactory.get("step") .<TestData, TestData>chunk(2) .reader(fileItemReader()) .writer(list -> list.forEach(System.out::println)) .build(); } private ItemReader<TestData> fileItemReader() { FlatFileItemReader<TestData> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址 reader.setLinesToSkip(1); // 忽略第一行 // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取, // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 设置属性名,类似于表头 tokenizer.setNames("id", "field1", "field2", "field3"); // 将每行数据转换为TestData对象 DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>(); // 设置LineTokenizer mapper.setLineTokenizer(tokenizer); // 设置映射方式,即读取到的文本怎么转换为对应的POJO mapper.setFieldSetMapper(fieldSet -> { TestData data = new TestData(); data.setId(fieldSet.readInt("id")); data.setField1(fieldSet.readString("field1")); data.setField2(fieldSet.readString("field2")); data.setField3(fieldSet.readString("field3")); return data; }); reader.setLineMapper(mapper); return reader; } }
4.3、输出数据
输出数据也包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等
Component public class FileItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Resource(name = "writerSimpleReader") private ListItemReader<TestData> writerSimpleReader; @Bean public Job fileItemWriterJob() throws Exception { return jobBuilderFactory.get("fileItemWriterJob") .start(step()) .build(); } private Step step() throws Exception { return stepBuilderFactory.get("step") .<TestData, TestData>chunk(2) .reader(writerSimpleReader) .writer(fileItemWriter()) .build(); } private FlatFileItemWriter<TestData> fileItemWriter() throws Exception { FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>(); FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file"); Path path = Paths.get(file.getPath()); if (!Files.exists(path)) { Files.createFile(path); } // 设置输出文件路径 writer.setResource(file); // 把读到的每个TestData对象转换为JSON字符串 LineAggregator<TestData> aggregator = item -> { try { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(item); } catch (JsonProcessingException e) { e.printStackTrace(); } return ""; }; writer.setLineAggregator(aggregator); writer.afterPropertiesSet(); return writer; } }
4.5、处理数据
@Component public class ValidatingItemProcessorDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Resource(name = "processorSimpleReader") private ListItemReader<TestData> processorSimpleReader; @Bean public Job validatingItemProcessorJob() throws Exception { return jobBuilderFactory.get("validatingItemProcessorJob3") .start(step()) .build(); } private Step step() throws Exception { return stepBuilderFactory.get("step") .<TestData, TestData>chunk(2) .reader(processorSimpleReader) .processor(beanValidatingItemProcessor()) .writer(list -> list.forEach(System.out::println)) .build(); } // private ValidatingItemProcessor<TestData> validatingItemProcessor() { // ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>(); // processor.setValidator(value -> { // // 对每一条数据进行校验 // if ("".equals(value.getField3())) { // // 如果field3的值为空串,则抛异常 // throw new ValidationException("field3的值不合法"); // } // }); // return processor; // } private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception { BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>(); // 开启过滤,不符合规则的数据被过滤掉; // beanValidatingItemProcessor.setFilter(true); beanValidatingItemProcessor.afterPropertiesSet(); return beanValidatingItemProcessor; } }
4.6、任务调度
可以配合 quartz 或者 xxljob 实现定时任务执行
@RestController @RequestMapping("job") public class JobController { @Autowired private Job job; @Autowired private JobLauncher jobLauncher; @GetMapping("launcher/{message}") public String launcher(@PathVariable String message) throws Exception { JobParameters parameters = new JobParametersBuilder() .addString("message", message) .toJobParameters(); // 将参数传递给任务 jobLauncher.run(job, parameters); return "success"; } }