Spring Batch学习记录及示例项目代码

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: Spring Batch学习记录及示例项目代码

在这里插入图片描述

Spring Batch学习记录总结

关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ

Spring Batch的一些基本概念:
在这里插入图片描述

  • ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。

  • ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。

  • ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。

  • Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。

  • Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。

    在这里插入图片描述

  • Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。

  • JobLauncher:负责启动Job。

  • JobRepository:存储关于配置和执行的Job(作业)的元数据。

简单使用

1、创建一个SpringBoot项目。

2、在项目中创建包config,并在该包下面创建一个Configuration类。

在类上面加上@Configuration;@EnableBatchProcessing;两个注解。

    @Configuration
    @EnableBatchProcessing
    public class SingleJobConfiguration {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 程序执行的入口
         * @return
         */
        @Bean
        public Job helloJob(){
   
   
            return jobBuilderFactory.get("hellojpb-1")
                    .start(step1())
                    .build();
        }

        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }
    }

3、现在启动项目,发现控制台出现以下错误信息。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class


​ Action:

Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).

解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息;因此我们需要配置具体的数据库信息。

方式一:配置内存数据库H2。

         <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
         </dependency>

方式二:配置Mysql数据库。

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

Mysql数据库连接信息:

    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false
    spring.datasource.username=root
    spring.datasource.password=123456

    spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
    spring.batch.initialize-schema=always

Spring Batch具体分析

  • Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。

    tasklet方式:

    在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。

    chunk方式:

    该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。

  • Spring Batch框架主要有四个角色:
    JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
    Job:代表一个具体的任务。
    Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
    JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。

Step的两种实现方式:

   //方式1   chunk
        @Bean
        Step step() {
   
   
            return stepBuilderFactory.get(STEP)
                    .<Music, Music>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .build();

        }
    //方式2   tasklet
        @Bean
        public Step step1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring batch");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

Flow的创建和使用

  • Flow是多个Step的集合。
  • 可以被多个Job复用。
  • 使用FlowBuilder来创建。
    @Configuration
    @EnableBatchProcessing
    public class FlowDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;


        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step flowDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step flowDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step flowDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring flowDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Flow 对象   ,指明Flow对象包含哪些Step
         */
        public Flow  flowDemoFlow(){
   
   
            return new FlowBuilder<Flow>("flowDemoFlow")
                    .start(flowDemoStep1())
                    .next(flowDemoStep2())
                    .build();
        }

        /**
         * 创建Job对象
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("flowjob")
                    .start(flowDemoFlow())
                    .next(flowDemoStep3())
                    .end()
                    .build();

        }

split实现并发执行

实现任务中的多个Step或多个flow并发执行。

关键代码:

    @Configuration
    @EnableBatchProcessing
    public class SplitDemo {
   
   

        @Autowired
        private JobBuilderFactory jobBuilderFactory;

        @Autowired
        private StepBuilderFactory stepBuilderFactory;

        /**
         * 创建 Step1
         * @return
         */
        @Bean
        public Step splitDemoStep1() {
   
   
            return stepBuilderFactory.get("step-1").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep1");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step2
         * @return
         */
        @Bean
        public Step  splitDemoStep2() {
   
   
            return stepBuilderFactory.get("step-2").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep2");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

        /**
         * 创建Step3
         * @return
         */
        @Bean
        public Step  splitDemoStep3() {
   
   
            return stepBuilderFactory.get("step-3").tasklet(
                    new Tasklet() {
   
   
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                            System.out.println("hello spring splitDemoStep3");
                            return RepeatStatus.FINISHED;
                        }
                    }
            ).build();
        }

创建Flow 对象 ,指明Flow对象包含哪些Step:

      public Flow  splitDemoFlow1(){
   
   return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}public Flow  splitDemoFlow2(){
   
   
            return new FlowBuilder<Flow>("splitDemoFlow2")
                    .start(splitDemoStep2())
                    .next(splitDemoStep3())
                    .build();
        }
        /**
         * 创建任务
         * @return
         */
        @Bean
        public Job  job(){
   
   
            return jobBuilderFactory.get("splitDemoJob")
                    .start(splitDemoFlow1())
                    .split(new SimpleAsyncTaskExecutor())
                    .add(splitDemoFlow2())
                    .end()
                    .build();
        }
    }

决策器的使用

接口:JobExecutionDecider

自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法

public class MyDecider implements JobExecutionDecider {
   
   
    private int count;
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
   
   
        count ++;
        if(count % 2 == 0){
   
   
            return new FlowExecutionStatus("even");
        }else {
   
   
            return new FlowExecutionStatus("odd");
        }
    }
}
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
   
   

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    @Bean
    public Step deciderDemoStep1(){
   
   
       return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
   
   
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                System.out.println("hello spring deciderDemoStep1");
                return RepeatStatus.FINISHED;
        }
        }).build();
    }

    @Bean
    public Step deciderDemoStep2() {
   
   
        return stepBuilderFactory.get("deciderDemoStep2").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("even");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }
    @Bean
    public Step deciderDemoStep3() {
   
   
        return stepBuilderFactory.get("deciderDemoStep3").tasklet(
                new Tasklet() {
   
   
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
   
   
                        System.out.println("odd");
                        return RepeatStatus.FINISHED;
                    }
                }
        ).build();
    }


    /**
     * 创建决策器
     * @return
     */
    @Bean
    public JobExecutionDecider myDecider(){
   
   
        return new MyDecider();
    }

    @Bean
    public Job  deciderDemoJob(){
   
   
        return jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider())
                .end()
                .build();
    }
}

完整例子

主要代码如下:
1、项目总体结构图。
在这里插入图片描述
2、在pom.xml文件中引入依赖。

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--引入Spring Batch-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

       <!--引入mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

        <!--引入jdbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

       <!-- 引入mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

<!--        引入其他工具包-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

3、配置数据库的连接信息。

server:
  port: 8081

#  配置Mysql连接信息
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/xk_test?useSSL=false
    username: root
    password: root
    schema: classpath:/org/springframework/batch/core/schema-mysql.sql
  batch:
    initialize-schema: always
    job:
      enabled: false

4、读数据CityDataReader.java

@Service
@Slf4j
@StepScope
public class CityDataReader implements ItemReader<City> {
   
   

    @Autowired
    private CityService cityService;

    List<City> cityInfoLists = null;
    private int nextCityIndex = 0;

    @Override
    public City read() throws Exception {
   
   
        this.init();
        City city = null;
        if(nextCityIndex < cityInfoLists.size()){
   
   
            city = cityInfoLists.get(nextCityIndex);
            nextCityIndex ++;
            return city;
        }
        return null;
    }

    private void init() {
   
   
       cityInfoLists = cityService.getCityInfoLists();
    }
}

5、处理数据CityDataProcess.java

@Slf4j
@Service
@StepScope
public class CityDataProcess implements ItemProcessor<City,City> {
   
   

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public City process(City city) throws Exception {
   
   
        log.info("开始处理第{}条数据----参数{}:{}.........",city.getId(),date,name);
        String detail = city.getProvince() + "-" + city.getCity() + "-" + city.getDistrict();
        city.setDetail(detail);
        log.info("第{}条数据已处理完成......",city.getId());

        return city;
    }
}

6、写数据CityDataWriter.java

@Service
@StepScope
@Slf4j
public class CityDataWriter implements ItemWriter<City> {
   
   

    @Autowired
    private CityService cityService;

    @Value("#{jobParameters['startDate']}")
    private Date date;

    @Value("#{jobParameters['name']}")
    private String name;

    @Override
    public void write(List<? extends City> cities) throws Exception {
   
   
        for(City city : cities) {
   
   
            log.info("第{}条数据开始更新.........}", city.getId());
            int update = cityService.update(city);
            if (update > 0 ) {
   
   
                log.info("第{}条数据更新完成.........}", city.getId());
            }
        }
    }
}

7、配置Job。JobConfig.java

@Configuration
public class JobConfig {
   
   

    private static final Logger log = LoggerFactory.getLogger(SimpleJobConfig.class);

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    SqlSessionFactory sqlSessionFactory;

    @Autowired
    private CityDataReader cityDataReader;

    @Autowired
    private CityDataProcess cityDataProcess;

    @Autowired
    private CityDataWriter cityDataWriter;

    @Autowired
    private MyTasketOne myTasketOne;

    //配置一个Job
    @Bean("singleJob")
    public Job singleStepJob() {
   
   
        return jobBuilderFactory.get("singleJob")
                .start(singlestep())
                .next(singlestep2())
                .listener(new MyJobExecutionListener())
                .build();
    }

    //配置第一个处理Step
    @Bean
    public Step singlestep() {
   
   
        return stepBuilderFactory.get("singlestep")
                .<City,City>chunk(9)
                .reader(cityDataReader)          // 读数据逻辑
                .processor(cityDataProcess)   //数据处理逻辑——业务处理
                .writer(cityDataWriter)         //写数据逻辑
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }

    //配置第二个处理Step
    @Bean
    public Step singlestep2() {
   
   
        return stepBuilderFactory.get("singlestep2")
                .tasklet(myTasketOne)
                .listener(new MyStepExecutionListener())  //配置监听器
                .build();
    }
}

8、创建controller,启动Job。

@RestController
@RequestMapping("batch")
public class JobStartController {
   
   

    @Autowired
    private Job singleJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 启动 Job
     * @return
     * @throws Exception
     */
    @GetMapping("/start")
    public String invokeStep() throws Exception {
   
   
        JobParameters  jobParameters = new JobParametersBuilder()
                .addDate("startDate",DateUtil.date(System.currentTimeMillis()))
                .addString("name","xxkfz")
                .toJobParameters();
        jobLauncher.run(singleJob, jobParameters);
        return "The job is Run  start.......";
    }
}

9、主启动类上加上@EnableBatchProcessing注解。
10、测试。
访问:http://127.0.0.1:8081/batch/start 启动Job。
在这里插入图片描述
在这里插入图片描述
本案例项目代码获取方式:关注下方二维码,私信回复【Spring Batch学习项目】即可获取哦

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
9天前
|
安全 Java 应用服务中间件
Spring Boot + Java 21:内存减少 60%,启动速度提高 30% — 零代码
通过调整三个JVM和Spring Boot配置开关,无需重写代码即可显著优化Java应用性能:内存减少60%,启动速度提升30%。适用于所有在JVM上运行API的生产团队,低成本实现高效能。
84 3
|
26天前
|
安全 IDE Java
Spring 的@FieldDefaults和@Data:Lombok 注解以实现更简洁的代码
本文介绍了如何在 Spring 应用程序中使用 Project Lombok 的 `@Data` 和 `@FieldDefaults` 注解来减少样板代码,提升代码可读性和可维护性,并探讨了其适用场景与限制。
Spring 的@FieldDefaults和@Data:Lombok 注解以实现更简洁的代码
|
2月前
|
人工智能 监控 安全
Spring AOP切面编程颠覆传统!3大核心注解+5种通知类型,让业务代码纯净如初
本文介绍了AOP(面向切面编程)的基本概念、优势及其在Spring Boot中的使用。AOP作为OOP的补充,通过将横切关注点(如日志、安全、事务等)与业务逻辑分离,实现代码解耦,提升模块化程度、可维护性和灵活性。文章详细讲解了Spring AOP的核心概念,包括切面、切点、通知等,并提供了在Spring Boot中实现AOP的具体步骤和代码示例。此外,还列举了AOP在日志记录、性能监控、事务管理和安全控制等场景中的实际应用。通过本文,开发者可以快速掌握AOP编程思想及其实践技巧。
|
3月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
281 4
|
3月前
|
安全 Java Nacos
0代码改动实现Spring应用数据库帐密自动轮转
Nacos作为国内被广泛使用的配置中心,已经成为应用侧的基础设施产品,近年来安全问题被更多关注,这是中国国内软件行业逐渐迈向成熟的标志,也是必经之路,Nacos提供配置加密存储-运行时轮转的核心安全能力,将在应用安全领域承担更多职责。
|
3月前
|
Java 测试技术 Spring
简单学Spring Boot | 博客项目的测试
本内容介绍了基于Spring Boot的博客项目测试实践,重点在于通过测试驱动开发(TDD)优化服务层代码,提升代码质量和功能可靠性。案例详细展示了如何为PostService类编写测试用例、运行测试并根据反馈优化功能代码,包括两次优化过程。通过TDD流程,确保每项功能经过严格验证,增强代码可维护性与系统稳定性。
160 0
|
3月前
|
存储 Java 数据库连接
简单学Spring Boot | 博客项目的三层架构重构
本案例通过采用三层架构(数据访问层、业务逻辑层、表现层)重构项目,解决了集中式开发导致的代码臃肿问题。各层职责清晰,结合依赖注入实现解耦,提升了系统的可维护性、可测试性和可扩展性,为后续接入真实数据库奠定基础。
276 0
|
3月前
|
前端开发 Java API
酒店管理系统基于 JavaFX Spring Boot 和 React 经典项目重构实操
本文介绍了基于现代技术栈的酒店管理系统开发方案,整合了JavaFX、Spring Boot和React三大技术框架。系统采用前后端分离架构,JavaFX构建桌面客户端,React开发Web管理界面,Spring Boot提供RESTful API后端服务。核心功能模块包括客房管理和客户预订流程,文中提供了JavaFX实现的客房管理界面代码示例和React开发的预订组件代码,展示了如何实现客房信息展示、添加修改操作以及在线预订功能。
190 0
|
3月前
|
Java 应用服务中间件 Maven
第01课:Spring Boot开发环境搭建和项目启动
第01课:Spring Boot开发环境搭建和项目启动
499 0