Spring Batch学习记录总结
关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ
Spring Batch的一些基本概念:
ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。
ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。
ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。
Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。
Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。
Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。
JobLauncher:负责启动Job。
JobRepository:存储关于配置和执行的Job(作业)的元数据。
简单使用
1、创建一个SpringBoot项目。
2、在项目中创建包config,并在该包下面创建一个Configuration类。
在类上面加上@Configuration
;@EnableBatchProcessing
;两个注解。
@Configuration
@EnableBatchProcessing
public class SingleJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 程序执行的入口
* @return
*/
@Bean
public Job helloJob(){
return jobBuilderFactory.get("hellojpb-1")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
}
).build();
}
}
3、现在启动项目,发现控制台出现以下错误信息。
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
Reason: Failed to determine a suitable driver class
Action:
Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).
解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息
;因此我们需要配置具体的数据库信息。
方式一:配置内存数据库H2。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
方式二:配置Mysql数据库。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
Mysql数据库连接信息:
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always
Spring Batch具体分析
Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。
tasklet方式:
在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。
chunk方式:
该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。
- Spring Batch框架主要有四个角色:
JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
Job:代表一个具体的任务。
Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。
Step的两种实现方式:
//方式1 chunk
@Bean
Step step() {
return stepBuilderFactory.get(STEP)
.<Music, Music>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build();
}
//方式2 tasklet
@Bean
public Step step1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
}
).build();
}
Flow的创建和使用
- Flow是多个Step的集合。
- 可以被多个Job复用。
- 使用FlowBuilder来创建。
@Configuration
@EnableBatchProcessing
public class FlowDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建 Step1
* @return
*/
@Bean
public Step flowDemoStep1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep1");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step2
* @return
*/
@Bean
public Step flowDemoStep2() {
return stepBuilderFactory.get("step-2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep2");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step3
* @return
*/
@Bean
public Step flowDemoStep3() {
return stepBuilderFactory.get("step-3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring flowDemoStep3");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Flow 对象 ,指明Flow对象包含哪些Step
*/
public Flow flowDemoFlow(){
return new FlowBuilder<Flow>("flowDemoFlow")
.start(flowDemoStep1())
.next(flowDemoStep2())
.build();
}
/**
* 创建Job对象
* @return
*/
@Bean
public Job job(){
return jobBuilderFactory.get("flowjob")
.start(flowDemoFlow())
.next(flowDemoStep3())
.end()
.build();
}
split实现并发执行
实现任务中的多个Step或多个flow并发执行。
关键代码:
@Configuration
@EnableBatchProcessing
public class SplitDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 创建 Step1
* @return
*/
@Bean
public Step splitDemoStep1() {
return stepBuilderFactory.get("step-1").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep1");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step2
* @return
*/
@Bean
public Step splitDemoStep2() {
return stepBuilderFactory.get("step-2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep2");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建Step3
* @return
*/
@Bean
public Step splitDemoStep3() {
return stepBuilderFactory.get("step-3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring splitDemoStep3");
return RepeatStatus.FINISHED;
}
}
).build();
}
创建Flow 对象 ,指明Flow对象包含哪些Step:
public Flow splitDemoFlow1(){
return new FlowBuilder<Flow>("splitDemoFlow1")
.start(splitDemoStep1())
.build();
}
public Flow splitDemoFlow2(){
return new FlowBuilder<Flow>("splitDemoFlow2")
.start(splitDemoStep2())
.next(splitDemoStep3())
.build();
}
/**
* 创建任务
* @return
*/
@Bean
public Job job(){
return jobBuilderFactory.get("splitDemoJob")
.start(splitDemoFlow1())
.split(new SimpleAsyncTaskExecutor())
.add(splitDemoFlow2())
.end()
.build();
}
}
决策器的使用
接口:JobExecutionDecider
自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法
public class MyDecider implements JobExecutionDecider {
private int count;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count ++;
if(count % 2 == 0){
return new FlowExecutionStatus("even");
}else {
return new FlowExecutionStatus("odd");
}
}
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step deciderDemoStep1(){
return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello spring deciderDemoStep1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep2() {
return stepBuilderFactory.get("deciderDemoStep2").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("even");
return RepeatStatus.FINISHED;
}
}
).build();
}
@Bean
public Step deciderDemoStep3() {
return stepBuilderFactory.get("deciderDemoStep3").tasklet(
new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("odd");
return RepeatStatus.FINISHED;
}
}
).build();
}
/**
* 创建决策器
* @return
*/
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
@Bean
public Job deciderDemoJob(){
return jobBuilderFactory.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even").to(deciderDemoStep2())
.from(myDecider()).on("odd").to(deciderDemoStep3())
.from(deciderDemoStep3()).on("*").to(myDecider())
.end()
.build();
}
}
完整例子
主要代码如下:
1、项目总体结构图。
2、在pom.xml文件中引入依赖。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!--引入Spring Batch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<!--引入mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<!--引入jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- 引入mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 引入其他工具包-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
3、配置数据库的连接信息。
server:
port: 8081
# 配置Mysql连接信息
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/xk_test?useSSL=false
username: root
password: root
schema: classpath:/org/springframework/batch/core/schema-mysql.sql
batch:
initialize-schema: always
job:
enabled: false
4、读数据CityDataReader.java
@Service
@Slf4j
@StepScope
public class CityDataReader implements ItemReader<City> {
@Autowired
private CityService cityService;
List<City> cityInfoLists = null;
private int nextCityIndex = 0;
@Override
public City read() throws Exception {
this.init();
City city = null;
if(nextCityIndex < cityInfoLists.size()){
city = cityInfoLists.get(nextCityIndex);
nextCityIndex ++;
return city;
}
return null;
}
private void init() {
cityInfoLists = cityService.getCityInfoLists();
}
}
5、处理数据CityDataProcess.java
@Slf4j
@Service
@StepScope
public class CityDataProcess implements ItemProcessor<City,City> {
@Value("#{jobParameters['startDate']}")
private Date date;
@Value("#{jobParameters['name']}")
private String name;
@Override
public City process(City city) throws Exception {
log.info("开始处理第{}条数据----参数{}:{}.........",city.getId(),date,name);
String detail = city.getProvince() + "-" + city.getCity() + "-" + city.getDistrict();
city.setDetail(detail);
log.info("第{}条数据已处理完成......",city.getId());
return city;
}
}
6、写数据CityDataWriter.java
@Service
@StepScope
@Slf4j
public class CityDataWriter implements ItemWriter<City> {
@Autowired
private CityService cityService;
@Value("#{jobParameters['startDate']}")
private Date date;
@Value("#{jobParameters['name']}")
private String name;
@Override
public void write(List<? extends City> cities) throws Exception {
for(City city : cities) {
log.info("第{}条数据开始更新.........}", city.getId());
int update = cityService.update(city);
if (update > 0 ) {
log.info("第{}条数据更新完成.........}", city.getId());
}
}
}
}
7、配置Job。JobConfig.java
@Configuration
public class JobConfig {
private static final Logger log = LoggerFactory.getLogger(SimpleJobConfig.class);
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Autowired
private CityDataReader cityDataReader;
@Autowired
private CityDataProcess cityDataProcess;
@Autowired
private CityDataWriter cityDataWriter;
@Autowired
private MyTasketOne myTasketOne;
//配置一个Job
@Bean("singleJob")
public Job singleStepJob() {
return jobBuilderFactory.get("singleJob")
.start(singlestep())
.next(singlestep2())
.listener(new MyJobExecutionListener())
.build();
}
//配置第一个处理Step
@Bean
public Step singlestep() {
return stepBuilderFactory.get("singlestep")
.<City,City>chunk(9)
.reader(cityDataReader) // 读数据逻辑
.processor(cityDataProcess) //数据处理逻辑——业务处理
.writer(cityDataWriter) //写数据逻辑
.listener(new MyStepExecutionListener()) //配置监听器
.build();
}
//配置第二个处理Step
@Bean
public Step singlestep2() {
return stepBuilderFactory.get("singlestep2")
.tasklet(myTasketOne)
.listener(new MyStepExecutionListener()) //配置监听器
.build();
}
}
8、创建controller,启动Job。
@RestController
@RequestMapping("batch")
public class JobStartController {
@Autowired
private Job singleJob;
@Autowired
private JobLauncher jobLauncher;
/**
* 启动 Job
* @return
* @throws Exception
*/
@GetMapping("/start")
public String invokeStep() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("startDate",DateUtil.date(System.currentTimeMillis()))
.addString("name","xxkfz")
.toJobParameters();
jobLauncher.run(singleJob, jobParameters);
return "The job is Run start.......";
}
}
9、主启动类上加上@EnableBatchProcessing
注解。
10、测试。
访问:http://127.0.0.1:8081/batch/start
启动Job。
本案例项目代码获取方式:关注下方二维码,私信回复【Spring Batch学习项目】即可获取哦