Spring Batch轻量级批处理实战

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Spring Batch轻量级批处理实战

1 实战前的理论基础

1.1 Spring Batch是什么

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大的批处理应用程序。同时使开发人员在必要时可以轻松访问和利用更先进的企业服务。Spring Batch 不是调度框架,它旨在与调度程序一起工作,而不是取代调度程序。

1.2 Spring Batch能做什么
  • 自动化、复杂的大量信息处理,无需用户交互即可最有效地处理。这些操作通常包括基于时间的事件(例如月末计算、通知或通信)。
  • 定期应用在非常大的数据集上重复处理的复杂业务规则(例如,保险福利确定或费率调整)。
  • 将从内部和外部系统接收的信息集成到记录系统中,这些信息通常需要以事务方式进行格式化、验证和处理。批处理用于每天为企业处理数十亿笔交易。

业务场景:

  • 定期提交批处理
  • 并发批处理:作业的并行处理
  • 分阶段的、企业消息驱动的处理
  • 大规模并行批处理
  • 失败后手动或计划重启
  • 依赖步骤的顺序处理(扩展到工作流驱动的批处理)
  • 部分处理:跳过记录(例如,在回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况

总之Spring batch可以做的:

  • 从数据库、文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改后的形式写回数据。
1.3 基础架构

1.4 核心概念和抽象

核心概念:一个 Job 有一对多的Step,每个步骤都正好有一个 ItemReader、一个ItemProcessor和 一个ItemWriter。需要启动作业(使用 JobLauncher),并且需要存储有关当前运行进程的元数据(在 中 JobRepository)。

2 各个组件介绍

2.1 Job

Job是封装了整个批处理过程的实体。与其他 Spring 项目一样,一个Job与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以被称为“作业配置”。

可配置项:

  • 作业的简单名称。
  • Step实例的定义和排序。
  • 作业是否可重新启动。
2.2 Step

一个Step是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每个 Job 完全由一个或多个步骤组成。一个Step包含定义和控制实际批处理所需的所有信息。

一个StepExecution代表一次尝试执行一个StepStepExecution 每次Step运行时都会创建一个新的,类似于JobExecution

2.3 ExecutionContext

一个ExecutionContext表示由框架持久化和控制的键/值对的集合,以允许开发人员有一个地方来存储范围为StepExecution对象或JobExecution对象的持久状态。

2.4 JobRepository

JobRepository是上述所有 Stereotypes 的持久性机制。它提供了CRUD操作JobLauncherJob以及Step实现。当 Job第一次启动,一个JobExecution被从库中获得,并且,执行的过程中,StepExecutionJobExecution实施方式是通过将它们传递到存储库持续。

使用 Java 配置时,@EnableBatchProcessing注解提供了一个 JobRepository作为开箱即用自动配置的组件之一。

2.5 JobLauncher

JobLauncher表示一个简单的接口,用于Job使用给定的 集合 启动JobParameters,如以下示例所示:

public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望实现JobExecution从 中 获得有效JobRepository并执行Job

2.6 Item Reader

ItemReader是一种抽象,表示一次检索Step一个项目的输入。当ItemReader用完它可以提供的项目时,它通过返回来表明这一点null

2.7 Item Writer

ItemWriter是一种抽象,表示一次一个Step、一批或一大块项目的输出。通常, anItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。

2.8 Item Processor

ItemProcessor是表示项目的业务处理的抽象。当ItemReader读取一个项目并ItemWriter写入它们时,它 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null表示不应写出该项目。

3 Spring Batch实战

下面就利用我们所学的理论实现一个最简单的Spring Batch批处理项目

3.1 依赖和项目结构以及配置文件

依赖

<!--Spring batch-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- web依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
</dependency>
<!--  mysql-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
<!--  mybatis-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.2.0</version>
</dependency>

项目结构

配置文件

server.port=9000
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
3.2 代码和数据表

数据表

CREATE TABLE `student` (
    `id` int(100) NOT NULL AUTO_INCREMENT,
    `name` varchar(45) DEFAULT NULL,
    `age` int(2) DEFAULT NULL,
    `address` varchar(45) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT

Student实体类

/**
 * @desc: Student实体类
 * @author: YanMingXin
 * @create: 2021/10/15-12:17
 **/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {
    @TableId(value = "id", type = IdType.AUTO)
    private Long sId;
    @TableField("name")
    private String sName;
    @TableField("age")
    private Integer sAge;
    @TableField("address")
    private String sAddress;
}

Mapper层

/**
 * @desc: Mapper层
 * @author: YanMingXin
 * @create: 2021/10/15-12:17
 **/
@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}

模拟数据库(文件)中读取类

/**
 * @desc: 模拟数据库中读取
 * @author: YanMingXin
 * @create: 2021/10/16-10:13
 **/
public class StudentVirtualDao {
    /**
     * 模拟从数据库中读取
     *
     * @return
     */
    public List<Student> getStudents() {
        ArrayList<Student> students = new ArrayList<>();
        students.add(new Student(1L, "zs", 23, "Beijing"));
        students.add(new Student(2L, "ls", 23, "Beijing"));
        students.add(new Student(3L, "ww", 23, "Beijing"));
        students.add(new Student(4L, "zl", 23, "Beijing"));
        students.add(new Student(5L, "mq", 23, "Beijing"));
        students.add(new Student(6L, "gb", 23, "Beijing"));
        students.add(new Student(7L, "lj", 23, "Beijing"));
        students.add(new Student(8L, "ss", 23, "Beijing"));
        students.add(new Student(9L, "zsdd", 23, "Beijing"));
        students.add(new Student(10L, "zss", 23, "Beijing"));
        return students;
    }
}

Service层接口

/**
 * @desc:
 * @author: YanMingXin
 * @create: 2021/10/15-12:16
 **/
public interface StudentService {
    List<Student> selectStudentsFromDB();
    void insertStudent(Student student);
}

Service层实现类

/**
 * @desc: Service层实现类
 * @author: YanMingXin
 * @create: 2021/10/15-12:16
 **/
@Service
public class StudentServiceImpl implements StudentService {
    @Autowired
    private StudentDao studentDao;
    @Override
    public List<Student> selectStudentsFromDB() {
        return studentDao.selectList(null);
    }
    @Override
    public void insertStudent(Student student) {
        studentDao.insert(student);
    }
}

最核心的配置类BatchConfiguration

/**
 * @desc: BatchConfiguration
 * @author: YanMingXin
 * @create: 2021/10/15-12:25
 **/
@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {
    /**
     * 注入JobBuilderFactory
     */
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    /**
     * 注入StepBuilderFactory
     */
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    /**
     * 注入JobRepository
     */
    @Autowired
    public JobRepository jobRepository;
    /**
     * 注入JobLauncher
     */
    @Autowired
    private JobLauncher jobLauncher;
    /**
     * 注入自定义StudentService
     */
    @Autowired
    private StudentService studentService;
    /**
     * 注入自定义job
     */
    @Autowired
    private Job studentJob;
    /**
     * 封装writer bean
     *
     * @return
     */
    @Bean
    public ItemWriter<Student> writer() {
        ItemWriter<Student> writer = new ItemWriter() {
            @Override
            public void write(List list) throws Exception {
                //debug发现是嵌套的List reader的线程List嵌套真正的List
                list.forEach((stu) -> {
                    for (Student student : (ArrayList<Student>) stu) {
                        studentService.insertStudent(student);
                    }
                });
            }
        };
        return writer;
    }
    /**
     * 封装reader bean
     *
     * @return
     */
    @Bean
    public ItemReader<Student> reader() {
        ItemReader<Student> reader = new ItemReader() {
            @Override
            public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                //模拟数据获取
                StudentVirtualDao virtualDao = new StudentVirtualDao();
                return virtualDao.getStudents();
            }
        };
        return reader;
    }
    /**
     * 封装processor bean
     *
     * @return
     */
    @Bean
    public ItemProcessor processor() {
        ItemProcessor processor = new ItemProcessor() {
            @Override
            public Object process(Object o) throws Exception {
                //debug发现o就是reader单次单线程读取的数据
                return o;
            }
        };
        return processor;
    }
    /**
     * 封装自定义step
     *
     * @return
     */
    @Bean
    public Step studentStepOne() {
        return stepBuilderFactory.get("studentStepOne")
            .chunk(1)
            .reader(reader()) //加入reader
            .processor(processor())  //加入processor
            .writer(writer())//加入writer
            .build();
    }
    /**
     * 封装自定义job
     *
     * @return
     */
    @Bean
    public Job studentJob() {
        return jobBuilderFactory.get("studentJob")
            .flow(studentStepOne())//加入step
            .end()
            .build();
    }
    /**
     * 使用spring 定时任务执行
     */
    @Scheduled(fixedRate = 5000)
    public void printMessage() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
            jobLauncher.run(studentJob, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3.3 测试

项目启动1s之后

看数据库,除了我们实体类定义的表以外多出来这么多表,这些表都是spring batch自带的记录日志和错误的表,具体的字段含义的有待研究

4 实战后的总结

Spring Batch有非常快的写入和读取速度,但是带来的影响就是非常耗费内存和数据库连接池的资源如果使用不好的话还会发生异常,因此我们要进行正确的配置,接下来我们进行简单的源码探究:

4.1 JobBuilderFactory

job的获取使用了简单工厂模式和建造者模式JobBuilderFactory获取JobBuilder在经过配置返回一个job对象的实例,该实例就是Spring Batch中最顶级的组件,包含了n和step

public class JobBuilderFactory {
   private JobRepository jobRepository;
   public JobBuilderFactory(JobRepository jobRepository) {
      this.jobRepository = jobRepository;
   }
   //返回JobBuilder
   public JobBuilder get(String name) {
      JobBuilder builder = new JobBuilder(name).repository(jobRepository);
      return builder;
   }
}

jobBuilder类

public class JobBuilder extends JobBuilderHelper<JobBuilder> {
   /**
    * 为指定名称的作业创建一个新的构建器
    */
   public JobBuilder(String name) {
      super(name);
   }
   /**
    * 创建将执行步骤或步骤序列的新作业构建器。
    */
   public SimpleJobBuilder start(Step step) {
      return new SimpleJobBuilder(this).start(step);
   }
   /**
    * 创建将执行流的新作业构建器。
    */
   public JobFlowBuilder start(Flow flow) {
      return new FlowJobBuilder(this).start(flow);
   }
   /**
    * 创建将执行步骤或步骤序列的新作业构建器
    */
   public JobFlowBuilder flow(Step step) {
      return new FlowJobBuilder(this).start(step);
   }
}
4.2 StepBuilderFactory

直接看StepBuilder类

public class StepBuilder extends StepBuilderHelper<StepBuilder> {
   public StepBuilder(String name) {
      super(name);
   }
   /**
    * 用自定义微线程构建步骤,不一定是项处理。  
    */
   public TaskletStepBuilder tasklet(Tasklet tasklet) {
      return new TaskletStepBuilder(this).tasklet(tasklet);
   }
   /**
    * 构建一个步骤,按照提供的大小以块的形式处理项。为了将这一步扩展到容错,
    * 在构建器上调用SimpleStepBuilder的 faultolerant()方法。
    * @param <I> 输入类型
    * @param <O> 输出类型
    */
   public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
      return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
   }
   public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
      return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
   }
   public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
      return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
   }
   public PartitionStepBuilder partitioner(Step step) {
      return new PartitionStepBuilder(this).step(step);
   }
   public JobStepBuilder job(Job job) {
      return new JobStepBuilder(this).job(job);
   }
   /**
    * 创建将执行流的新步骤构建器。
    */
   public FlowStepBuilder flow(Flow flow) {
      return new FlowStepBuilder(this).flow(flow);
   }
}

参考文档:

https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html

https://www.jdon.com/springbatch.html

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
5月前
|
XML Java 测试技术
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
这篇文章介绍了Spring5框架的三个新特性:支持@Nullable注解以明确方法返回、参数和属性值可以为空;引入函数式风格的GenericApplicationContext进行对象注册和管理;以及如何整合JUnit5进行单元测试,同时讨论了JUnit4与JUnit5的整合方法,并提出了关于配置文件加载的疑问。
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
|
2月前
|
监控 Java 数据库连接
详解Spring Batch:在Spring Boot中实现高效批处理
详解Spring Batch:在Spring Boot中实现高效批处理
270 12
|
3月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
405 6
|
3月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
188 2
|
3月前
|
Java 数据库连接 Spring
【2021Spring编程实战笔记】Spring开发分享~(下)
【2021Spring编程实战笔记】Spring开发分享~(下)
38 1
|
3月前
|
XML Java 数据格式
Spring IOC容器的深度解析及实战应用
【10月更文挑战第14天】在软件工程中,随着系统规模的扩大,对象间的依赖关系变得越来越复杂,这导致了系统的高耦合度,增加了开发和维护的难度。为解决这一问题,Michael Mattson在1996年提出了IOC(Inversion of Control,控制反转)理论,旨在降低对象间的耦合度,提高系统的灵活性和可维护性。Spring框架正是基于这一理论,通过IOC容器实现了对象间的依赖注入和生命周期管理。
91 0
|
3月前
|
XML Java 数据库连接
【2020Spring编程实战笔记】Spring开发分享~(上)
【2020Spring编程实战笔记】Spring开发分享~
61 0
|
5月前
|
NoSQL Java Redis
Redis6入门到实战------ 八、Redis与Spring Boot整合
这篇文章详细介绍了如何在Spring Boot项目中整合Redis,包括在`pom.xml`中添加依赖、配置`application.properties`文件、创建配置类以及编写测试类来验证Redis的连接和基本操作。
Redis6入门到实战------ 八、Redis与Spring Boot整合
|
5月前
|
SQL 数据库
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
这篇文章是Spring5框架的实战教程,深入讲解了如何使用JdbcTemplate进行数据库的批量操作,包括批量添加、批量修改和批量删除的具体代码实现和测试过程,并通过完整的项目案例展示了如何在实际开发中应用这些技术。
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
|
5月前
|
XML Java 数据格式
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解
这篇文章是Spring5框架的AOP切面编程教程,通过XML配置方式,详细讲解了如何创建被增强类和增强类,如何在Spring配置文件中定义切入点和切面,以及如何将增强逻辑应用到具体方法上。文章通过具体的代码示例和测试结果,展示了使用XML配置实现AOP的过程,并强调了虽然注解开发更为便捷,但掌握XML配置也是非常重要的。
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解