Spring Batch 批处理框架,真心强呀!! 下

简介: Spring Batch 批处理框架,真心强呀!! 下
C、任务决策

决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();
        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            return new FlowExecutionStatus("weekend");
        } else {
            return new FlowExecutionStatus("workingDay");
        }
    }
}
@Bean
public Job deciderJob() {
 return jobBuilderFactory.get("deciderJob")
   .start(step1())
   .next(myDecider)
   .from(myDecider).on("weekend").to(step2())
   .from(myDecider).on("workingDay").to(step3())
   .from(step3()).on("*").to(step4())
   .end()
   .build();
}
private Step step1() {
 return stepBuilderFactory.get("step1")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤一操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step2() {
 return stepBuilderFactory.get("step2")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤二操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step3() {
 return stepBuilderFactory.get("step3")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤三操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step4() {
 return stepBuilderFactory.get("step4")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤四操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
D、任务嵌套

任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。

@Component
public class NestedJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
    // 子任务二
    private Job childJobTwo() {
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

4.2、读取数据

定义 Model TestData,下面同一

@Data
public class TestData {
    private int id;
    private String field1;
    private String field2;
    private String field3;
}

读取数据包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等,具体自己查资料。

文本数据读取 Demo

@Component
public class FileItemReaderDemo {
    // 任务创建工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    // 步骤创建工厂
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job fileItemReaderJob() {
        return jobBuilderFactory.get("fileItemReaderJob2")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(fileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> fileItemReader() {
        FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址
        reader.setLinesToSkip(1); // 忽略第一行
        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        // 设置属性名,类似于表头
        tokenizer.setNames("id", "field1", "field2", "field3");
        // 将每行数据转换为TestData对象
        DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
        // 设置LineTokenizer
        mapper.setLineTokenizer(tokenizer);
        // 设置映射方式,即读取到的文本怎么转换为对应的POJO
        mapper.setFieldSetMapper(fieldSet -> {
            TestData data = new TestData();
            data.setId(fieldSet.readInt("id"));
            data.setField1(fieldSet.readString("field1"));
            data.setField2(fieldSet.readString("field2"));
            data.setField3(fieldSet.readString("field3"));
            return data;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
}

4.3、输出数据

输出数据也包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等

Component
public class FileItemWriterDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "writerSimpleReader")
    private ListItemReader<TestData> writerSimpleReader;
    @Bean
    public Job fileItemWriterJob() throws Exception {
        return jobBuilderFactory.get("fileItemWriterJob")
                .start(step())
                .build();
    }
    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(writerSimpleReader)
                .writer(fileItemWriter())
                .build();
    }
    private FlatFileItemWriter<TestData> fileItemWriter() throws Exception {
        FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>();
        FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
        Path path = Paths.get(file.getPath());
        if (!Files.exists(path)) {
            Files.createFile(path);
        }
        // 设置输出文件路径
        writer.setResource(file);
        // 把读到的每个TestData对象转换为JSON字符串
        LineAggregator<TestData> aggregator = item -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.writeValueAsString(item);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return "";
        };
        writer.setLineAggregator(aggregator);
        writer.afterPropertiesSet();
        return writer;
    }
}

4.5、处理数据

@Component
public class ValidatingItemProcessorDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "processorSimpleReader")
    private ListItemReader<TestData> processorSimpleReader;
    @Bean
    public Job validatingItemProcessorJob() throws Exception {
        return jobBuilderFactory.get("validatingItemProcessorJob3")
                .start(step())
                .build();
    }
    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(processorSimpleReader)
                .processor(beanValidatingItemProcessor())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
//    private ValidatingItemProcessor<TestData> validatingItemProcessor() {
//        ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
//        processor.setValidator(value -> {
//            // 对每一条数据进行校验
//            if ("".equals(value.getField3())) {
//                // 如果field3的值为空串,则抛异常
//                throw new ValidationException("field3的值不合法");
//            }
//        });
//        return processor;
//    }
    private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {
        BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
        // 开启过滤,不符合规则的数据被过滤掉;
//        beanValidatingItemProcessor.setFilter(true);
        beanValidatingItemProcessor.afterPropertiesSet();
        return beanValidatingItemProcessor;
    }
}

4.6、任务调度

可以配合 quartz 或者 xxljob 实现定时任务执行

@RestController
@RequestMapping("job")
public class JobController {
    @Autowired
    private Job job;
    @Autowired
    private JobLauncher jobLauncher;
    @GetMapping("launcher/{message}")
    public String launcher(@PathVariable String message) throws Exception {
        JobParameters parameters = new JobParametersBuilder()
                .addString("message", message)
                .toJobParameters();
        // 将参数传递给任务
        jobLauncher.run(job, parameters);
        return "success";
    }
}


目录
打赏
0
0
0
0
579
分享
相关文章
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
Spring AI Alibaba 应用框架挑战赛圆满落幕,恭喜获奖选手
Spring AI Alibaba 应用框架挑战赛圆满落幕,恭喜获奖选手
Spring AI Alibaba 应用框架挑战赛圆满落幕,恭喜获奖选手
第二届开放原子大赛 Spring AI Alibaba 应用框架挑战赛决赛于 2 月 23 日在北京圆满落幕。
Spring框架初识
Spring 是一个分层的轻量级开源框架,核心功能包括控制反转(IOC)和面向切面编程(AOP)。主要模块有核心容器、Spring 上下文、AOP、DAO、ORM、Web 模块和 MVC 框架。它通过 IOC 将配置与代码分离,简化开发;AOP 提供了声明性事务管理等增强功能。
73 21
Spring框架初识
通过springboot框架创建对象(一)
在Spring Boot中,对象创建依赖于Spring框架的核心特性——控制反转(IoC)和依赖注入(DI)。IoC将对象的创建和管理交由Spring应用上下文负责,开发者只需定义依赖关系。DI通过构造函数、setter方法或字段注入实现依赖对象的传递。Spring Boot的自动配置机制基于类路径和配置文件,自动为应用程序配置Spring容器,简化开发过程。Bean的生命周期包括定义扫描、实例化、依赖注入、初始化和销毁回调,均由Spring容器管理。这些特性提高了开发效率并简化了代码维护。
对Spring、SpringMVC、MyBatis框架的介绍与解释
Spring 框架提供了全面的基础设施支持,Spring MVC 专注于 Web 层的开发,而 MyBatis 则是一个高效的持久层框架。这三个框架结合使用,可以显著提升 Java 企业级应用的开发效率和质量。通过理解它们的核心特性和使用方法,开发者可以更好地构建和维护复杂的应用程序。
136 29
Spring Boot中的日志框架选择
在Spring Boot开发中,日志管理至关重要。常见的日志框架有Logback、Log4j2、Java Util Logging和Slf4j。选择合适的日志框架需考虑性能、灵活性、社区支持及集成配置。本文以Logback为例,演示了如何记录不同级别的日志消息,并强调合理配置日志框架对提升系统可靠性和开发效率的重要性。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等