Spring Batch学习记录及示例项目代码

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spring Batch学习记录及示例项目代码

在这里插入图片描述

Spring Batch学习记录总结

关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ

Spring Batch的一些基本概念:
在这里插入图片描述

  • ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。

  • ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。

  • ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。

  • Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。

  • Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。

    在这里插入图片描述

  • Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。

  • JobLauncher:负责启动Job。

  • JobRepository:存储关于配置和执行的Job(作业)的元数据。

简单使用

1、创建一个SpringBoot项目。

2、在项目中创建包config,并在该包下面创建一个Configuration类。

在类上面加上@Configuration;@EnableBatchProcessing;两个注解。

    @Configuration
    @EnableBatchProcessing
    public class SingleJobConfiguration {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 程序执行的入口
         * @return
         */
        @Bean
        public Job helloJob(){
   
   
            return jobBuilderFactory.get("hellojpb-1")
                    .start(step1())
                    .build();
        }

        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    }

3、现在启动项目,发现控制台出现以下错误信息。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class


​ Action:

Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).

解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息;因此我们需要配置具体的数据库信息。

方式一:配置内存数据库H2。

         <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
         </dependency>

方式二:配置Mysql数据库。

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

Mysql数据库连接信息:

    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
    spring.datasource.username=root
    spring.datasource.password=123456

    spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
    spring.batch.initialize-schema=always

Spring Batch具体分析

  • Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。

    tasklet方式:

    在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。

    chunk方式:

    该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。

  • Spring Batch框架主要有四个角色:
    JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
    Job:代表一个具体的任务。
    Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
    JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。

Step的两种实现方式:

   //方式1   chunk
        @Bean
        Step step() {
   
   
            return stepBuilderFactory.get(STEP)
                    .<Music, Music>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .build();

        }
    //方式2   tasklet
        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

Flow的创建和使用

  • Flow是多个Step的集合。
  • 可以被多个Job复用。
  • 使用FlowBuilder来创建。
    @Configuration
    @EnableBatchProcessing
    public class FlowDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;


        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step flowDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step flowDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step flowDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Flow 对象   ,指明Flow对象包含哪些Step
         */
        public Flow  flowDemoFlow(){
   
   
            return new FlowBuilder<Flow>("flowDemoFlow")
                    .start(flowDemoStep1())
                    .next(flowDemoStep2())
                    .build();
        }

        /**
         * 创建Job对象
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("flowjob")
                    .start(flowDemoFlow())
                    .next(flowDemoStep3())
                    .end()
                    .build();

        }

split实现并发执行

实现任务中的多个Step或多个flow并发执行。

关键代码:

    @Configuration
    @EnableBatchProcessing
    public class SplitDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step splitDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step  splitDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step  splitDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

创建Flow 对象 ,指明Flow对象包含哪些Step:

      public Flow  splitDemoFlow1(){
   
   return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}public Flow  splitDemoFlow2(){
   
   
            return new FlowBuilder<Flow>("splitDemoFlow2")
                    .start(splitDemoStep2())
                    .next(splitDemoStep3())
                    .build();
        }
        /**
         * 创建任务
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("splitDemoJob")
                    .start(splitDemoFlow1())
                    .split(new SimpleAsyncTaskExecutor())
                    .add(splitDemoFlow2())
                    .end()
                    .build();
        }
    }

决策器的使用

接口:JobExecutionDecider

自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法

public class MyDecider implements JobExecutionDecider {
   
   
    private int count;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
   
   
        count ++;
        if(count % 2 == 0){
   
   
            return new FlowExecutionStatus("even");
        }else {
   
   
            return new FlowExecutionStatus("odd");
        }
    }
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
   
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    @Bean
    public Step deciderDemoStep1(){
   
   
       return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
   
   
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                System.out.println("hello spring deciderDemoStep1");
                return RepeatStatus.FINISHED;
        }
        }).build();
    }

    @Bean
    public Step deciderDemoStep2() {
   
   
        return stepBuilderFactory.get("deciderDemoStep2").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("even");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }
    @Bean
    public Step deciderDemoStep3() {
   
   
        return stepBuilderFactory.get("deciderDemoStep3").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("odd");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }


    /**
     * 创建决策器
     * @return
     */
    @Bean
    public JobExecutionDecider myDecider(){
   
   
        return new MyDecider();
    }

    @Bean
    public Job  deciderDemoJob(){
   
   
        return jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider())
                .end()
                .build();
    }
}

完整例子

主要代码如下:
1、项目总体结构图。
在这里插入图片描述
2、在pom.xml文件中引入依赖。

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--引入Spring Batch-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

       <!--引入mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

        <!--引入jdbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

       <!-- 引入mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!--        引入其他工具包-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

3、配置数据库的连接信息。

server:
  port: 8081

#  配置Mysql连接信息
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/xk_test?useSSL=false
    username: root
    password: root
    schema: classpath:/org/springframework/batch/core/schema-mysql.sql
  batch:
    initialize-schema: always
    job:
      enabled: false

4、读数据CityDataReader.java

@Service
@Slf4j
@StepScope
public class CityDataReader implements ItemReader<City> {
   
   

    @Autowired
    private CityService cityService;

    List<City> cityInfoLists = null;
    private int nextCityIndex = 0;

    @Override
    public City read() throws Exception {
   
   
        this.init();
        City city = null;
        if(nextCityIndex < cityInfoLists.size()){
   
   
            city = cityInfoLists.get(nextCityIndex);
            nextCityIndex ++;
            return city;
        }
        return null;
    }

    private void init() {
   
   
       cityInfoLists = cityService.getCityInfoLists();
    }
}

5、处理数据CityDataProcess.java

@Slf4j
@Service
@StepScope
public class CityDataProcess implements ItemProcessor<City,City> {
   
   

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public City process(City city) throws Exception {
   
   
        log.info("开始处理第{}条数据----参数{}:{}.........",city.getId(),date,name);
        String detail = city.getProvince() + "-" + city.getCity() + "-" + city.getDistrict();
        city.setDetail(detail);
        log.info("第{}条数据已处理完成......",city.getId());

        return city;
    }
}

6、写数据CityDataWriter.java

@Service
@StepScope
@Slf4j
public class CityDataWriter implements ItemWriter<City> {
   
   

    @Autowired
    private CityService cityService;

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public void write(List<? extends City> cities) throws Exception {
   
   
        for(City city : cities) {
   
   
            log.info("第{}条数据开始更新.........}", city.getId());
            int update = cityService.update(city);
            if (update > 0 ) {
   
   
                log.info("第{}条数据更新完成.........}", city.getId());
            }
        }
    }
}

7、配置Job。JobConfig.java

@Configuration
public class JobConfig {
   
   

    private static final Logger log = LoggerFactory.getLogger(SimpleJobConfig.class);

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    SqlSessionFactory sqlSessionFactory;

    @Autowired
    private CityDataReader cityDataReader;

    @Autowired
    private CityDataProcess cityDataProcess;

    @Autowired
    private CityDataWriter cityDataWriter;

    @Autowired
    private MyTasketOne myTasketOne;

    //配置一个Job
    @Bean("singleJob")
    public Job singleStepJob() {
   
   
        return jobBuilderFactory.get("singleJob")
                .start(singlestep())
                .next(singlestep2())
                .listener(new MyJobExecutionListener())
                .build();
    }

    //配置第一个处理Step
    @Bean
    public Step singlestep() {
   
   
        return stepBuilderFactory.get("singlestep")
                .<City,City>chunk(9)
                .reader(cityDataReader)          // 读数据逻辑
                .processor(cityDataProcess)   //数据处理逻辑——业务处理
                .writer(cityDataWriter)         //写数据逻辑
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }

    //配置第二个处理Step
    @Bean
    public Step singlestep2() {
   
   
        return stepBuilderFactory.get("singlestep2")
                .tasklet(myTasketOne)
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }
}

8、创建controller,启动Job。

@RestController
@RequestMapping("batch")
public class JobStartController {
   
   

    @Autowired
    private Job singleJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 启动 Job
     * @return
     * @throws Exception
     */
    @GetMapping("/start")
    public String invokeStep() throws Exception {
   
   
        JobParameters  jobParameters = new JobParametersBuilder()
                .addDate("startDate",DateUtil.date(System.currentTimeMillis()))
                .addString("name","xxkfz")
                .toJobParameters();
        jobLauncher.run(singleJob, jobParameters);
        return "The job is Run  start.......";
    }
}

9、主启动类上加上@EnableBatchProcessing注解。
10、测试。
访问:http://127.0.0.1:8081/batch/start 启动Job。
在这里插入图片描述
在这里插入图片描述
本案例项目代码获取方式:关注下方二维码,私信回复【Spring Batch学习项目】即可获取哦

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
9天前
|
Java 应用服务中间件 Maven
Spring Boot项目打war包(idea:多种方式)
Spring Boot项目打war包(idea:多种方式)
32 1
|
7天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
9天前
|
Java 应用服务中间件 Spring
【JavaEE】Spring Boot - 项目的创建和使用(下)
【JavaEE】Spring Boot - 项目的创建和使用
13 0
|
9天前
|
数据可视化 Java 应用服务中间件
【JavaEE】Spring Boot - 项目的创建和使用(上)
【JavaEE】Spring Boot - 项目的创建和使用
16 0
|
9天前
|
存储 Java 对象存储
【JavaEE】DI与DL的介绍-Spring项目的创建-Bean对象的存储与获取
【JavaEE】DI与DL的介绍-Spring项目的创建-Bean对象的存储与获取
9 0
|
9天前
|
Java 关系型数据库 MySQL
【Java Spring开源项目】新蜂(NeeBee)商城项目运行、分析、总结
【Java Spring开源项目】新蜂(NeeBee)商城项目运行、分析、总结
161 4
|
9天前
|
XML Java 数据格式
Spring 项目如何使用AOP
Spring 项目如何使用AOP
28 2
|
缓存 Java Maven
spring中那些让你爱不释手的代码技巧(续集下)
spring中那些让你爱不释手的代码技巧(续集下)
spring中那些让你爱不释手的代码技巧(续集下)
|
消息中间件 Java 数据库连接
spring中那些让你爱不释手的代码技巧(续集上)
spring中那些让你爱不释手的代码技巧(续集)
spring中那些让你爱不释手的代码技巧(续集上)
|
9天前
|
Java 应用服务中间件 Maven
SpringBoot 项目瘦身指南
SpringBoot 项目瘦身指南
65 0