Spring Batch学习记录及示例项目代码

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Spring Batch学习记录及示例项目代码

在这里插入图片描述

Spring Batch学习记录总结

关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ

Spring Batch的一些基本概念:
在这里插入图片描述

  • ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。

  • ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。

  • ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。

  • Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。

  • Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。

    在这里插入图片描述

  • Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。

  • JobLauncher:负责启动Job。

  • JobRepository:存储关于配置和执行的Job(作业)的元数据。

简单使用

1、创建一个SpringBoot项目。

2、在项目中创建包config,并在该包下面创建一个Configuration类。

在类上面加上@Configuration;@EnableBatchProcessing;两个注解。

    @Configuration
    @EnableBatchProcessing
    public class SingleJobConfiguration {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 程序执行的入口
         * @return
         */
        @Bean
        public Job helloJob(){
   
   
            return jobBuilderFactory.get("hellojpb-1")
                    .start(step1())
                    .build();
        }

        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    }

3、现在启动项目,发现控制台出现以下错误信息。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class


​ Action:

Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).

解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息;因此我们需要配置具体的数据库信息。

方式一:配置内存数据库H2。

         <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
         </dependency>

方式二:配置Mysql数据库。

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

Mysql数据库连接信息:

    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
    spring.datasource.username=root
    spring.datasource.password=123456

    spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
    spring.batch.initialize-schema=always

Spring Batch具体分析

  • Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。

    tasklet方式:

    在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。

    chunk方式:

    该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。

  • Spring Batch框架主要有四个角色:
    JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
    Job:代表一个具体的任务。
    Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
    JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。

Step的两种实现方式:

   //方式1   chunk
        @Bean
        Step step() {
   
   
            return stepBuilderFactory.get(STEP)
                    .<Music, Music>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .build();

        }
    //方式2   tasklet
        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

Flow的创建和使用

  • Flow是多个Step的集合。
  • 可以被多个Job复用。
  • 使用FlowBuilder来创建。
    @Configuration
    @EnableBatchProcessing
    public class FlowDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;


        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step flowDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step flowDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step flowDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Flow 对象   ,指明Flow对象包含哪些Step
         */
        public Flow  flowDemoFlow(){
   
   
            return new FlowBuilder<Flow>("flowDemoFlow")
                    .start(flowDemoStep1())
                    .next(flowDemoStep2())
                    .build();
        }

        /**
         * 创建Job对象
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("flowjob")
                    .start(flowDemoFlow())
                    .next(flowDemoStep3())
                    .end()
                    .build();

        }

split实现并发执行

实现任务中的多个Step或多个flow并发执行。

关键代码:

    @Configuration
    @EnableBatchProcessing
    public class SplitDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step splitDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step  splitDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step  splitDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

创建Flow 对象 ,指明Flow对象包含哪些Step:

      public Flow  splitDemoFlow1(){
   
   return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}public Flow  splitDemoFlow2(){
   
   
            return new FlowBuilder<Flow>("splitDemoFlow2")
                    .start(splitDemoStep2())
                    .next(splitDemoStep3())
                    .build();
        }
        /**
         * 创建任务
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("splitDemoJob")
                    .start(splitDemoFlow1())
                    .split(new SimpleAsyncTaskExecutor())
                    .add(splitDemoFlow2())
                    .end()
                    .build();
        }
    }

决策器的使用

接口:JobExecutionDecider

自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法

public class MyDecider implements JobExecutionDecider {
   
   
    private int count;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
   
   
        count ++;
        if(count % 2 == 0){
   
   
            return new FlowExecutionStatus("even");
        }else {
   
   
            return new FlowExecutionStatus("odd");
        }
    }
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
   
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    @Bean
    public Step deciderDemoStep1(){
   
   
       return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
   
   
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                System.out.println("hello spring deciderDemoStep1");
                return RepeatStatus.FINISHED;
        }
        }).build();
    }

    @Bean
    public Step deciderDemoStep2() {
   
   
        return stepBuilderFactory.get("deciderDemoStep2").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("even");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }
    @Bean
    public Step deciderDemoStep3() {
   
   
        return stepBuilderFactory.get("deciderDemoStep3").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("odd");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }


    /**
     * 创建决策器
     * @return
     */
    @Bean
    public JobExecutionDecider myDecider(){
   
   
        return new MyDecider();
    }

    @Bean
    public Job  deciderDemoJob(){
   
   
        return jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider())
                .end()
                .build();
    }
}

完整例子

主要代码如下:
1、项目总体结构图。
在这里插入图片描述
2、在pom.xml文件中引入依赖。

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--引入Spring Batch-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

       <!--引入mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

        <!--引入jdbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

       <!-- 引入mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!--        引入其他工具包-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

3、配置数据库的连接信息。

server:
  port: 8081

#  配置Mysql连接信息
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/xk_test?useSSL=false
    username: root
    password: root
    schema: classpath:/org/springframework/batch/core/schema-mysql.sql
  batch:
    initialize-schema: always
    job:
      enabled: false

4、读数据CityDataReader.java

@Service
@Slf4j
@StepScope
public class CityDataReader implements ItemReader<City> {
   
   

    @Autowired
    private CityService cityService;

    List<City> cityInfoLists = null;
    private int nextCityIndex = 0;

    @Override
    public City read() throws Exception {
   
   
        this.init();
        City city = null;
        if(nextCityIndex < cityInfoLists.size()){
   
   
            city = cityInfoLists.get(nextCityIndex);
            nextCityIndex ++;
            return city;
        }
        return null;
    }

    private void init() {
   
   
       cityInfoLists = cityService.getCityInfoLists();
    }
}

5、处理数据CityDataProcess.java

@Slf4j
@Service
@StepScope
public class CityDataProcess implements ItemProcessor<City,City> {
   
   

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public City process(City city) throws Exception {
   
   
        log.info("开始处理第{}条数据----参数{}:{}.........",city.getId(),date,name);
        String detail = city.getProvince() + "-" + city.getCity() + "-" + city.getDistrict();
        city.setDetail(detail);
        log.info("第{}条数据已处理完成......",city.getId());

        return city;
    }
}

6、写数据CityDataWriter.java

@Service
@StepScope
@Slf4j
public class CityDataWriter implements ItemWriter<City> {
   
   

    @Autowired
    private CityService cityService;

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public void write(List<? extends City> cities) throws Exception {
   
   
        for(City city : cities) {
   
   
            log.info("第{}条数据开始更新.........}", city.getId());
            int update = cityService.update(city);
            if (update > 0 ) {
   
   
                log.info("第{}条数据更新完成.........}", city.getId());
            }
        }
    }
}

7、配置Job。JobConfig.java

@Configuration
public class JobConfig {
   
   

    private static final Logger log = LoggerFactory.getLogger(SimpleJobConfig.class);

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    SqlSessionFactory sqlSessionFactory;

    @Autowired
    private CityDataReader cityDataReader;

    @Autowired
    private CityDataProcess cityDataProcess;

    @Autowired
    private CityDataWriter cityDataWriter;

    @Autowired
    private MyTasketOne myTasketOne;

    //配置一个Job
    @Bean("singleJob")
    public Job singleStepJob() {
   
   
        return jobBuilderFactory.get("singleJob")
                .start(singlestep())
                .next(singlestep2())
                .listener(new MyJobExecutionListener())
                .build();
    }

    //配置第一个处理Step
    @Bean
    public Step singlestep() {
   
   
        return stepBuilderFactory.get("singlestep")
                .<City,City>chunk(9)
                .reader(cityDataReader)          // 读数据逻辑
                .processor(cityDataProcess)   //数据处理逻辑——业务处理
                .writer(cityDataWriter)         //写数据逻辑
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }

    //配置第二个处理Step
    @Bean
    public Step singlestep2() {
   
   
        return stepBuilderFactory.get("singlestep2")
                .tasklet(myTasketOne)
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }
}

8、创建controller,启动Job。

@RestController
@RequestMapping("batch")
public class JobStartController {
   
   

    @Autowired
    private Job singleJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 启动 Job
     * @return
     * @throws Exception
     */
    @GetMapping("/start")
    public String invokeStep() throws Exception {
   
   
        JobParameters  jobParameters = new JobParametersBuilder()
                .addDate("startDate",DateUtil.date(System.currentTimeMillis()))
                .addString("name","xxkfz")
                .toJobParameters();
        jobLauncher.run(singleJob, jobParameters);
        return "The job is Run  start.......";
    }
}

9、主启动类上加上@EnableBatchProcessing注解。
10、测试。
访问:http://127.0.0.1:8081/batch/start 启动Job。
在这里插入图片描述
在这里插入图片描述
本案例项目代码获取方式:关注下方二维码,私信回复【Spring Batch学习项目】即可获取哦

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
21天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
45 9
|
17天前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
30 2
|
2月前
|
设计模式 前端开发 Java
Spring MVC——项目创建和建立请求连接
MVC是一种软件架构设计模式,将应用分为模型、视图和控制器三部分。Spring MVC是基于MVC模式的Web框架,通过`@RequestMapping`等注解实现URL路由映射,支持GET和POST请求,并可传递参数。创建Spring MVC项目与Spring Boot类似,使用`@RestController`注解标记控制器类。
38 1
Spring MVC——项目创建和建立请求连接
|
2月前
|
Java 关系型数据库 MySQL
Maven——创建 Spring Boot项目
Maven 是一个项目管理工具,通过配置 `pom.xml` 文件自动获取所需的 jar 包,简化了项目的构建和管理过程。其核心功能包括项目构建和依赖管理,支持创建、编译、测试、打包和发布项目。Maven 仓库分为本地仓库和远程仓库,远程仓库包括中央仓库、私服和其他公共库。此外,文档还介绍了如何创建第一个 SpringBoot 项目并实现简单的 HTTP 请求响应。
132 1
Maven——创建 Spring Boot项目
|
26天前
|
缓存 监控 Java
|
2月前
|
前端开发 Java 数据库
SpringBoot学习
【10月更文挑战第7天】Spring学习
36 9
|
2月前
|
XML Java 数据格式
Spring学习
【10月更文挑战第6天】Spring学习
21 1
|
2月前
|
Java Apache Maven
Java/Spring项目的包开头为什么是com?
本文介绍了 Maven 项目的初始结构,并详细解释了 Java 包命名惯例中的域名反转规则。通过域名反转(如 `com.example`),可以确保包名的唯一性,避免命名冲突,提高代码的可读性和逻辑分层。文章还讨论了域名反转的好处,包括避免命名冲突、全球唯一性、提高代码可读性和逻辑分层。最后,作者提出了一个关于包名的问题,引发读者思考。
Java/Spring项目的包开头为什么是com?
|
3月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
|
2月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
187 2