Spring Batch 批处理框架,真心强呀!! 下

简介: Spring Batch 批处理框架,真心强呀!! 下
C、任务决策

决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();
        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            return new FlowExecutionStatus("weekend");
        } else {
            return new FlowExecutionStatus("workingDay");
        }
    }
}
@Bean
public Job deciderJob() {
 return jobBuilderFactory.get("deciderJob")
   .start(step1())
   .next(myDecider)
   .from(myDecider).on("weekend").to(step2())
   .from(myDecider).on("workingDay").to(step3())
   .from(step3()).on("*").to(step4())
   .end()
   .build();
}
private Step step1() {
 return stepBuilderFactory.get("step1")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤一操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step2() {
 return stepBuilderFactory.get("step2")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤二操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step3() {
 return stepBuilderFactory.get("step3")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤三操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
private Step step4() {
 return stepBuilderFactory.get("step4")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤四操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}
D、任务嵌套

任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。

@Component
public class NestedJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
    // 子任务二
    private Job childJobTwo() {
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

4.2、读取数据

定义 Model TestData,下面同一

@Data
public class TestData {
    private int id;
    private String field1;
    private String field2;
    private String field3;
}

读取数据包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等,具体自己查资料。

文本数据读取 Demo

@Component
public class FileItemReaderDemo {
    // 任务创建工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    // 步骤创建工厂
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job fileItemReaderJob() {
        return jobBuilderFactory.get("fileItemReaderJob2")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(fileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> fileItemReader() {
        FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址
        reader.setLinesToSkip(1); // 忽略第一行
        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        // 设置属性名,类似于表头
        tokenizer.setNames("id", "field1", "field2", "field3");
        // 将每行数据转换为TestData对象
        DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
        // 设置LineTokenizer
        mapper.setLineTokenizer(tokenizer);
        // 设置映射方式,即读取到的文本怎么转换为对应的POJO
        mapper.setFieldSetMapper(fieldSet -> {
            TestData data = new TestData();
            data.setId(fieldSet.readInt("id"));
            data.setField1(fieldSet.readString("field1"));
            data.setField2(fieldSet.readString("field2"));
            data.setField3(fieldSet.readString("field3"));
            return data;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
}

4.3、输出数据

输出数据也包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等

Component
public class FileItemWriterDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "writerSimpleReader")
    private ListItemReader<TestData> writerSimpleReader;
    @Bean
    public Job fileItemWriterJob() throws Exception {
        return jobBuilderFactory.get("fileItemWriterJob")
                .start(step())
                .build();
    }
    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(writerSimpleReader)
                .writer(fileItemWriter())
                .build();
    }
    private FlatFileItemWriter<TestData> fileItemWriter() throws Exception {
        FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>();
        FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
        Path path = Paths.get(file.getPath());
        if (!Files.exists(path)) {
            Files.createFile(path);
        }
        // 设置输出文件路径
        writer.setResource(file);
        // 把读到的每个TestData对象转换为JSON字符串
        LineAggregator<TestData> aggregator = item -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.writeValueAsString(item);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return "";
        };
        writer.setLineAggregator(aggregator);
        writer.afterPropertiesSet();
        return writer;
    }
}

4.5、处理数据

@Component
public class ValidatingItemProcessorDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "processorSimpleReader")
    private ListItemReader<TestData> processorSimpleReader;
    @Bean
    public Job validatingItemProcessorJob() throws Exception {
        return jobBuilderFactory.get("validatingItemProcessorJob3")
                .start(step())
                .build();
    }
    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(processorSimpleReader)
                .processor(beanValidatingItemProcessor())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
//    private ValidatingItemProcessor<TestData> validatingItemProcessor() {
//        ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
//        processor.setValidator(value -> {
//            // 对每一条数据进行校验
//            if ("".equals(value.getField3())) {
//                // 如果field3的值为空串,则抛异常
//                throw new ValidationException("field3的值不合法");
//            }
//        });
//        return processor;
//    }
    private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {
        BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
        // 开启过滤,不符合规则的数据被过滤掉;
//        beanValidatingItemProcessor.setFilter(true);
        beanValidatingItemProcessor.afterPropertiesSet();
        return beanValidatingItemProcessor;
    }
}

4.6、任务调度

可以配合 quartz 或者 xxljob 实现定时任务执行

@RestController
@RequestMapping("job")
public class JobController {
    @Autowired
    private Job job;
    @Autowired
    private JobLauncher jobLauncher;
    @GetMapping("launcher/{message}")
    public String launcher(@PathVariable String message) throws Exception {
        JobParameters parameters = new JobParametersBuilder()
                .addString("message", message)
                .toJobParameters();
        // 将参数传递给任务
        jobLauncher.run(job, parameters);
        return "success";
    }
}


相关文章
|
7月前
|
安全 Java Ruby
我尝试了所有后端框架 — — 这就是为什么只有 Spring Boot 幸存下来
作者回顾后端开发历程,指出多数框架在生产环境中难堪重负。相比之下,Spring Boot凭借内置安全、稳定扩展、完善生态和企业级支持,成为构建高可用系统的首选,真正经受住了时间与规模的考验。
543 2
|
8月前
|
XML JSON Java
Spring框架中常见注解的使用规则与最佳实践
本文介绍了Spring框架中常见注解的使用规则与最佳实践,重点对比了URL参数与表单参数的区别,并详细说明了@RequestParam、@PathVariable、@RequestBody等注解的应用场景。同时通过表格和案例分析,帮助开发者正确选择参数绑定方式,避免常见误区,提升代码的可读性与安全性。
|
6月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
6月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
460 8
|
7月前
|
监控 Kubernetes Cloud Native
Spring Batch 批处理框架技术详解与实践指南
本文档全面介绍 Spring Batch 批处理框架的核心架构、关键组件和实际应用场景。作为 Spring 生态系统中专门处理大规模数据批处理的框架,Spring Batch 为企业级批处理作业提供了可靠的解决方案。本文将深入探讨其作业流程、组件模型、错误处理机制、性能优化策略以及与现代云原生环境的集成方式,帮助开发者构建高效、稳定的批处理系统。
732 1
|
9月前
|
安全 Java 微服务
Java 最新技术和框架实操:涵盖 JDK 21 新特性与 Spring Security 6.x 安全框架搭建
本文系统整理了Java最新技术与主流框架实操内容,涵盖Java 17+新特性(如模式匹配、文本块、记录类)、Spring Boot 3微服务开发、响应式编程(WebFlux)、容器化部署(Docker+K8s)、测试与CI/CD实践,附完整代码示例和学习资源推荐,助你构建现代Java全栈开发能力。
901 1
|
8月前
|
Cloud Native Java API
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
1476 0
|
Java Spring
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
398 0
|
缓存 监控 Java
Spring框架之AOP(面向切面编程)
Spring框架之AOP(面向切面编程)
228 0
|
XML 设计模式 安全
【Spring框架四】——Spring AOP 注解实现和xml方式实现1
【Spring框架四】——Spring AOP 注解实现和xml方式实现
302 0

热门文章

最新文章