一、SpringBatch 介绍
Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮的批处理应用程序。Spring Batch 建立在人们期望的 Spring Framework 特性(生产力、基于 POJO 的开发方法和一般易用性)的基础上,同时使开发人员可以在必要时轻松访问和使用更高级的企业服务。
Spring Batch 不是一个调度框架。在商业和开源领域都有许多优秀的企业调度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在与调度程序结合使用,而不是替代调度程序。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
二、业务场景
我们在业务开发中经常遇到这种情况:
Spring Batch 支持以下业务场景:
- 定期提交批处理。
- 并发批处理:并行处理作业。
- 分阶段的企业消息驱动处理。
- 大规模并行批处理。
- 失败后手动或计划重启。
- 相关步骤的顺序处理(扩展到工作流驱动的批次)。
- 部分处理:跳过记录(例如,在回滚时)。
- 整批交易,适用于批量较小或已有存储过程或脚本的情况。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
三、基础知识
3.1、整体架构
名称 | 作用 |
JobRepository | 为所有的原型(Job、JobInstance、Step)提供持久化的机制 |
JobLauncher | JobLauncher 表示一个简单的接口,用于启动一个Job 给定的集合 JobParameters |
Job | Job 是封装了整个批处理过程的实体 |
Step | Step 是一个域对象,它封装了批处理作业的一个独立的顺序阶段 |
3.2、核心接口
ItemReader
: is an abstraction that represents the output of aStep
, one batch or chunk of items at a timeItemProcessor
:an abstraction that represents the business processing of an item.ItemWriter
: is an abstraction that represents the output of aStep
, one batch or chunk of items at a time.
大体即为 输入
→数据加工
→输出
,一个Job
定义多个Step
及处理流程,一个Step
通常涵盖ItemReader
、ItemProcessor
、ItemWriter
四、基础实操
4.0、引入 SpringBatch
pom 文件引入 springboot
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
pom 文件引入 spring-batch 及相关依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> </dependencies>
mysql 创建依赖的库表
sql 脚本的 jar 包路径:.....\maven\repository\org\springframework\batch\spring-batch-core\4.2.1.RELEASE\spring-batch-core-4.2.1.RELEASE.jar!\org\springframework\batch\core\schema-mysql.sql
启动类标志@EnableBatchProcessing
@SpringBootApplication @EnableBatchProcessing public class SpringBatchStartApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchStartApplication.class, args); } }
FirstJobDemo
@Component public class FirstJobDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job firstJob() { return jobBuilderFactory.get("firstJob") .start(step()) .build(); } private Step step() { return stepBuilderFactory.get("step") .tasklet((contribution, chunkContext) -> { System.out.println("执行步骤...."); return RepeatStatus.FINISHED; }).build(); } }
4.1、流程控制
A、多步骤任务
@Bean public Job multiStepJob() { return jobBuilderFactory.get("multiStepJob2") .start(step1()) .on(ExitStatus.COMPLETED.getExitCode()).to(step2()) .from(step2()) .on(ExitStatus.COMPLETED.getExitCode()).to(step3()) .from(step3()).end() .build(); } private Step step1() { return stepBuilderFactory.get("step1") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤一操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step2() { return stepBuilderFactory.get("step2") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤二操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step3() { return stepBuilderFactory.get("step3") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤三操作。。。"); return RepeatStatus.FINISHED; }).build(); }
B、并行执行
创建了两个 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通过JobBuilderFactory
的split
方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)
@Component public class SplitJobDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job splitJob() { return jobBuilderFactory.get("splitJob") .start(flow1()) .split(new SimpleAsyncTaskExecutor()).add(flow2()) .end() .build(); } private Step step1() { return stepBuilderFactory.get("step1") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤一操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step2() { return stepBuilderFactory.get("step2") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤二操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Step step3() { return stepBuilderFactory.get("step3") .tasklet((stepContribution, chunkContext) -> { System.out.println("执行步骤三操作。。。"); return RepeatStatus.FINISHED; }).build(); } private Flow flow1() { return new FlowBuilder<Flow>("flow1") .start(step1()) .next(step2()) .build(); } private Flow flow2() { return new FlowBuilder<Flow>("flow2") .start(step3()) .build(); } }