Spring Batch实战全解析:从入门到精通,搞定企业级批处理难题

简介: 本文全面介绍了SpringBatch框架在企业级批处理应用中的核心技术与实战方案。文章首先阐述了批处理的典型特征(无交互性、海量数据、可靠性等)和SpringBatch的核心优势(轻量化、可扩展、事务安全等),并通过对比其他批处理方案突出其适用性。随后详细解析了SpringBatch的核心架构,包括JobLauncher、Job、Step等组件的职责分工,以及批处理执行流程。

在企业级应用开发中,批处理场景无处不在——数据迁移、报表生成、日志分析、数据清洗同步等。这些场景的核心需求是高效处理海量数据、保证数据一致性、支持失败重试与断点续跑。Spring Batch作为Spring生态专为批处理设计的框架,凭借其轻量级、可扩展、事务安全的特性,成为企业批处理方案的首选。本文将从底层逻辑到实战落地,全面拆解Spring Batch,结合可直接运行的实例,让你快速掌握企业级批处理开发能力。

一、批处理核心概念与Spring Batch优势

1.1 批处理的核心特征

批处理是指批量处理大量数据的无交互式操作,典型特征包括:

  • 无交互性:无需人工干预,自动完成数据处理
  • 海量数据:单次处理数据量从万级到亿级不等
  • 可靠性:支持失败重试、断点续跑,避免数据丢失或重复
  • 高效性:支持并行处理、分区处理,提升处理速度
  • 事务性:保证批量操作的原子性,要么全部成功,要么全部回滚

1.2 Spring Batch的核心优势

Spring Batch基于Spring框架开发,继承了Spring的依赖注入、AOP等特性,同时针对批处理场景做了深度优化,核心优势如下:

  • 轻量化:核心依赖小,易于集成到Spring Boot项目
  • 可扩展性:支持自定义ItemReader/Processor/Writer、自定义监听器等,适配复杂场景
  • 事务安全:内置事务管理机制,保证批处理数据一致性
  • 丰富的组件:提供开箱即用的Reader(数据库、文件、消息队列)、Writer,减少重复开发
  • 监控与运维:支持批处理任务的状态跟踪、日志记录、失败报警
  • 并行与分区:支持多线程并行处理、多节点分区处理,应对海量数据
  • 兼容性:兼容Spring Boot、MyBatis-Plus、MySQL等主流技术栈

1.3 Spring Batch与其他批处理方案对比

方案 优势 劣势 适用场景
Spring Batch 生态完善、可扩展强、事务安全 需额外配置,学习成本中等 企业级复杂批处理场景
原生Java线程 实现简单、轻量 无事务管理、无重试机制 简单小规模批处理
Quartz 定时能力强 批处理核心能力薄弱 定时触发场景,需配合批处理框架
Flink/Spark 分布式处理能力强 重量级、部署复杂 超大规模数据批处理/流批一体

二、Spring Batch核心架构与底层逻辑

2.1 核心架构图

image.png

2.2 核心组件职责拆解

1. JobLauncher:任务启动器

  • 核心职责:接收任务启动请求,触发Job执行
  • 底层逻辑:通过JobRepository获取Job的配置信息,创建JobExecution执行实例,触发Step执行
  • 适用场景:程序调用、定时任务触发(结合Quartz/Spring Scheduler)

2. Job:批处理任务

  • 核心职责:定义批处理任务的整体流程,包含一个或多个Step
  • 底层逻辑:Job是逻辑上的任务单元,通过JobParameters传递参数,JobExecution记录执行状态(成功/失败/运行中)
  • 关键特性:支持重启(restartable)、参数校验、任务依赖

3. Step:任务步骤

  • 核心职责:批处理的最小执行单元,包含数据读取(Reader)、处理(Processor)、写入(Writer)的完整流程
  • 底层逻辑:Step通过Chunk(块)处理数据,即读取一定数量的记录后批量处理并写入,减少IO开销;支持事务管理、重试、跳过
  • 两种执行模式:
  • Chunk-oriented:适用于海量数据,分块处理
  • Tasklet-oriented:适用于简单任务(如创建目录、执行SQL)

4. ItemReader/ItemProcessor/ItemWriter:数据处理三组件

  • ItemReader:数据读取组件,从数据源(数据库、文件、CSV、Excel、消息队列)读取数据
  • ItemProcessor:数据处理组件,实现数据清洗、转换、过滤、校验等逻辑
  • ItemWriter:数据写入组件,将处理后的数据写入目标数据源(数据库、文件、缓存)

5. JobRepository:任务仓库

  • 核心职责:持久化存储Job、Step的执行状态、参数、结果等信息
  • 底层逻辑:通过数据库表(如BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION)存储元数据,支持断点续跑、状态监控
  • 支持存储:MySQL、Oracle、PostgreSQL等关系型数据库,也支持内存(仅测试用)

6. TransactionManager:事务管理器

  • 核心职责:保证Step处理的事务一致性,支持Chunk级别的事务提交
  • 底层逻辑:默认绑定Spring的PlatformTransactionManager,每个Chunk的处理作为一个事务,失败则回滚整个Chunk

2.3 批处理执行流程(底层逻辑)

image.png

三、环境搭建(基于JDK17+Spring Boot3+Maven)

3.1 核心依赖(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.5</version>
       <relativePath/>
   </parent>
   <groupId>com.jam.demo</groupId>
   <artifactId>spring-batch-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>spring-batch-demo</name>
   <description>Spring Batch 实战演示项目</description>
   
   <properties>
       <java.version>17</java.version>
       <mybatis-plus.version>3.5.5</mybatis-plus.version>
       <fastjson2.version>2.0.49</fastjson2.version>
       <lombok.version>1.18.30</lombok.version>
   </properties>
   
   <dependencies>
       <!-- Spring Boot 核心依赖 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter</artifactId>
       </dependency>
       
       <!-- Spring Batch 核心依赖 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-batch</artifactId>
       </dependency>
       
       <!-- 数据库依赖 -->
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       
       <!-- MyBatis-Plus 依赖 -->
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       
       <!-- Lombok 依赖 -->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       
       <!-- FastJSON2 依赖 -->
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       
       <!-- Spring Web 依赖(用于接口触发任务) -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       
       <!-- Swagger3 依赖 -->
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>2.3.0</version>
       </dependency>
       
       <!-- Google Guava 集合工具 -->
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>33.2.1-jre</version>
       </dependency>
       
       <!-- 测试依赖 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
       <dependency>
           <groupId>org.springframework.batch</groupId>
           <artifactId>spring-batch-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

3.2 配置文件(application.yml)

spring:
 # 数据源配置
 datasource:
   url: jdbc:mysql://localhost:3306/spring_batch_demo?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
   username: root
   password: root
   driver-class-name: com.mysql.cj.jdbc.Driver
 # Spring Batch 配置
 batch:
   # 自动执行开关(默认true,测试时可关闭)
   job:
     enabled: false
   # 数据库初始化模式(默认EMBEDDED,生产环境用ALWAYS)
   jdbc:
     initialize-schema: ALWAYS

# MyBatis-Plus 配置
mybatis-plus:
 mapper-locations: classpath:mapper/*.xml
 type-aliases-package: com.jam.demo.entity
 configuration:
   map-underscore-to-camel-case: true
   log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

# 日志配置
logging:
 level:
   root: info
   com.jam.demo: debug
   org.springframework.batch: info

# 服务器配置
server:
 port: 8080

# Swagger3 配置
springdoc:
 api-docs:
   path: /api-docs
 swagger-ui:
   path: /swagger-ui.html
   operationsSorter: method

3.3 数据库初始化(MySQL8.0)

Spring Batch需要初始化元数据表存储任务状态,通过spring.batch.jdbc.initialize-schema: ALWAYS自动初始化,核心表说明:

  • BATCH_JOB_INSTANCE:任务实例表,存储Job的唯一实例
  • BATCH_JOB_EXECUTION:任务执行表,存储Job的每次执行记录
  • BATCH_STEP_EXECUTION:步骤执行表,存储Step的每次执行记录
  • BATCH_JOB_PARAMS:任务参数表,存储Job执行时的参数

同时创建业务表(本文实战案例用:用户原始表、用户清洗表):

-- 用户原始表(待处理数据)
CREATE TABLE `user_origin` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
 `username` varchar(50) NOT NULL COMMENT '用户名',
 `phone` varchar(20) DEFAULT NULL COMMENT '手机号',
 `email` varchar(100) DEFAULT NULL COMMENT '邮箱',
 `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户原始表';

-- 用户清洗表(处理后数据)
CREATE TABLE `user_clean` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
 `username` varchar(50) NOT NULL COMMENT '用户名',
 `phone` varchar(20) NOT NULL COMMENT '手机号(清洗后)',
 `email` varchar(100) NOT NULL COMMENT '邮箱(清洗后)',
 `process_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '处理时间',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户清洗表';

-- 插入测试数据
INSERT INTO `user_origin` (`username`, `phone`, `email`, `create_time`)
VALUES
('张三', '13800138000', 'zhangsan@test.com', '2025-01-01 10:00:00'),
('李四', '13900139000', 'lisi@test.com', '2025-01-01 10:00:00'),
('王五', '13700137000', 'wangwu#test.com', '2025-01-01 10:00:00'), -- 邮箱格式错误
('赵六', '13600136000', '', '2025-01-01 10:00:00'), -- 邮箱为空
('孙七', '13500135000', 'sunqi@test.com', '2025-01-01 10:00:00');

四、核心组件实战(从基础到进阶)

4.1 实体类定义

4.1.1 UserOrigin(用户原始表实体)

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;

/**
* 用户原始表实体
* @author ken
*/

@Data
@TableName("user_origin")
public class UserOrigin {
   /**
    * 主键ID
    */

   @TableId(type = IdType.AUTO)
   private Long id;
   
   /**
    * 用户名
    */

   private String username;
   
   /**
    * 手机号
    */

   private String phone;
   
   /**
    * 邮箱
    */

   private String email;
   
   /**
    * 创建时间
    */

   private LocalDateTime createTime;
}

4.1.2 UserClean(用户清洗表实体)

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;

/**
* 用户清洗表实体
* @author ken
*/

@Data
@TableName("user_clean")
public class UserClean {
   /**
    * 主键ID
    */

   @TableId(type = IdType.AUTO)
   private Long id;
   
   /**
    * 用户名
    */

   private String username;
   
   /**
    * 手机号(清洗后)
    */

   private String phone;
   
   /**
    * 邮箱(清洗后)
    */

   private String email;
   
   /**
    * 处理时间
    */

   private LocalDateTime processTime;
}

4.2 Mapper层(MyBatis-Plus)

4.2.1 UserOriginMapper

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.UserOrigin;
import org.springframework.stereotype.Repository;

/**
* 用户原始表Mapper
* @author ken
*/

@Repository
public interface UserOriginMapper extends BaseMapper<UserOrigin> {
}

4.2.2 UserCleanMapper

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.UserClean;
import org.springframework.stereotype.Repository;

/**
* 用户清洗表Mapper
* @author ken
*/

@Repository
public interface UserCleanMapper extends BaseMapper<UserClean> {
}

4.3 ItemReader实战(数据读取)

ItemReader负责从数据源读取数据,Spring Batch提供多种开箱即用的实现:

  • JdbcCursorItemReader:数据库游标读取(适用于大数据量)
  • JdbcPagingItemReader:数据库分页读取
  • FlatFileItemReader:文件(CSV/Excel)读取
  • MyBatisPagingItemReader:MyBatis分页读取

本文使用MyBatisPagingItemReader(适配MyBatis-Plus,支持分页读取数据库数据):

package com.jam.demo.reader;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.UserOrigin;
import com.jam.demo.mapper.UserOriginMapper;
import lombok.RequiredArgsConstructor;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* 用户数据读取器配置
* @author ken
*/

@Configuration
@RequiredArgsConstructor
public class UserItemReaderConfig {
   private final UserOriginMapper userOriginMapper;
   
   /**
    * 构建MyBatis分页读取器
    * @return MyBatisPagingItemReader<UserOrigin>
    */

   @Bean
   public MyBatisPagingItemReader<UserOrigin> userOriginItemReader() {
       // 构建查询参数
       Map<String, Object> parameterValues = new HashMap<>(1);
       parameterValues.put("createTime", "2025-01-01 00:00:00");
       
       return new MyBatisPagingItemReaderBuilder<UserOrigin>()
               // 设置数据源(自动关联Spring数据源)
               .dataSource(userOriginMapper.getBaseMapper().getSqlSession().getConnection().getDataSource())
               // 设置查询语句(MyBatis映射语句ID)
               .queryId("com.jam.demo.mapper.UserOriginMapper.selectByCreateTime")
               // 设置参数
               .parameterValues(parameterValues)
               // 设置分页大小(每次读取10条)
               .pageSize(10)
               // 设置结果集映射
               .rowMapper((resultSet, i) -> {
                   UserOrigin userOrigin = new UserOrigin();
                   userOrigin.setId(resultSet.getLong("id"));
                   userOrigin.setUsername(resultSet.getString("username"));
                   userOrigin.setPhone(resultSet.getString("phone"));
                   userOrigin.setEmail(resultSet.getString("email"));
                   userOrigin.setCreateTime(resultSet.getTimestamp("createTime").toLocalDateTime());
                   return userOrigin;
               })
               .build();
   }
   
   /**
    * MyBatis映射语句(也可写在XML中)
    * 注:实际开发中建议放在UserOriginMapper.xml中
    */

   // 对应XML内容:
   /*
   <select id="selectByCreateTime" parameterType="java.util.Map" resultType="com.jam.demo.entity.UserOrigin">
       SELECT id, username, phone, email, create_time FROM user_origin
       WHERE create_time >= #{createTime}
       ORDER BY id ASC
   </select>
   */

}

4.4 ItemProcessor实战(数据处理)

ItemProcessor负责数据清洗、转换、过滤,核心逻辑:

  • 过滤无效数据(如邮箱为空)
  • 清洗错误数据(如邮箱格式错误)
  • 数据转换(如统一格式)

package com.jam.demo.processor;

import com.jam.demo.entity.UserClean;
import com.jam.demo.entity.UserOrigin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.regex.Pattern;

/**
* 用户数据处理器
* 负责数据清洗、过滤、转换
* @author ken
*/

@Slf4j
@Component
public class UserItemProcessor implements ItemProcessor<UserOrigin, UserClean> {
   /**
    * 邮箱正则表达式
    */

   private static final Pattern EMAIL_PATTERN = Pattern.compile("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$");
   
   /**
    * 处理数据
    * @param item 原始数据
    * @return 清洗后的数据(返回null则过滤该数据)
    * @throws Exception 处理异常
    */

   @Override
   public UserClean process(UserOrigin item) throws Exception {
       log.debug("开始处理用户数据:{}", item.getUsername());
       
       // 1. 过滤无效数据(手机号为空或用户名为空)
       if (StringUtils.isEmpty(item.getPhone()) || StringUtils.isEmpty(item.getUsername())) {
           log.warn("用户数据无效(手机号/用户名为空),跳过:{}", item.getId());
           return null;
       }
       
       // 2. 清洗邮箱数据
       String email = item.getEmail();
       if (StringUtils.isEmpty(email) || !EMAIL_PATTERN.matcher(email).matches()) {
           log.warn("邮箱格式错误,使用默认邮箱:{}", item.getId());
           email = item.getUsername() + "@default.com";
       }
       
       // 3. 转换为清洗后实体
       UserClean userClean = new UserClean();
       userClean.setUsername(item.getUsername());
       userClean.setPhone(item.getPhone());
       userClean.setEmail(email);
       
       log.debug("用户数据处理完成:{}", userClean.getUsername());
       return userClean;
   }
}

4.5 ItemWriter实战(数据写入)

ItemWriter负责将处理后的数据写入目标数据源,本文使用MyBatisBatchItemWriter(批量写入数据库):

package com.jam.demo.writer;

import com.jam.demo.entity.UserClean;
import com.jam.demo.mapper.UserCleanMapper;
import lombok.RequiredArgsConstructor;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 用户数据写入器配置
* @author ken
*/

@Configuration
@RequiredArgsConstructor
public class UserItemWriterConfig {
   private final UserCleanMapper userCleanMapper;
   
   /**
    * 构建MyBatis批量写入器
    * @return MyBatisBatchItemWriter<UserClean>
    */

   @Bean
   public MyBatisBatchItemWriter<UserClean> userCleanItemWriter() {
       return new MyBatisBatchItemWriterBuilder<UserClean>()
               // 设置SqlSessionFactory(自动关联MyBatis-Plus的SqlSessionFactory)
               .sqlSessionFactory(userCleanMapper.getBaseMapper().getSqlSession().getConfiguration().getEnvironment().getSqlSessionFactory())
               // 设置插入语句ID(MyBatis映射语句ID)
               .statementId("com.jam.demo.mapper.UserCleanMapper.insert")
               .build();
   }
}

4.6 Job与Step配置(核心流程定义)

package com.jam.demo.config;

import com.jam.demo.entity.UserClean;
import com.jam.demo.entity.UserOrigin;
import com.jam.demo.processor.UserItemProcessor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Spring Batch 核心配置(Job与Step定义)
* @author ken
*/

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchCoreConfig {
   /**
    * Job构建工厂
    */

   private final JobBuilderFactory jobBuilderFactory;
   
   /**
    * Step构建工厂
    */

   private final StepBuilderFactory stepBuilderFactory;
   
   /**
    * 事务管理器
    */

   private final PlatformTransactionManager platformTransactionManager;
   
   /**
    * 数据读取器
    */

   private final ItemReader<UserOrigin> userOriginItemReader;
   
   /**
    * 数据处理器
    */

   private final ItemProcessor<UserOrigin, UserClean> userItemProcessor;
   
   /**
    * 数据写入器
    */

   private final ItemWriter<UserClean> userCleanItemWriter;
   
   /**
    * 定义用户数据清洗Job
    * @return Job
    */

   @Bean
   public Job userDataCleanJob() {
       return jobBuilderFactory.get("userDataCleanJob")
               // 增量器(保证每次执行Job的ID唯一)
               .incrementer(new RunIdIncrementer())
               // 关联Step(一个Job可包含多个Step,按顺序执行)
               .flow(userDataCleanStep())
               // 结束Job
               .end()
               // 构建Job
               .build();
   }
   
   /**
    * 定义用户数据清洗Step
    * @return Step
    */

   @Bean
   public Step userDataCleanStep() {
       return stepBuilderFactory.get("userDataCleanStep")
               // 配置数据类型(输入类型、输出类型)
               .<UserOrigin, UserClean>chunk(10)
               // 设置事务管理器
               .transactionManager(platformTransactionManager)
               // 配置读取器
               .reader(userOriginItemReader)
               // 配置处理器
               .processor(userItemProcessor)
               // 配置写入器
               .writer(userCleanItemWriter)
               // 配置重试机制(异常重试3次,间隔1秒)
               .faultTolerant()
               .retryLimit(3)
               .retry(Exception.class)
               .build()
;
   }
}

4.7 JobLauncher实战(任务触发)

通过Spring Web接口触发Job执行,结合Swagger3提供接口文档:

package com.jam.demo.controller;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
* 批处理任务触发控制器
* @author ken
*/

@Slf4j
@RestController
@RequestMapping("/batch/job")
@RequiredArgsConstructor
@Tag(name = "批处理任务接口", description = "用于触发Spring Batch任务执行")
public class BatchJobController {
   /**
    * 任务启动器
    */

   private final JobLauncher jobLauncher;
   
   /**
    * 用户数据清洗任务
    */

   private final Job userDataCleanJob;
   
   /**
    * 触发用户数据清洗任务
    * @param createTime 筛选条件:创建时间(格式:yyyy-MM-dd HH:mm:ss)
    * @return 任务执行结果
    */

   @PostMapping("/user/clean")
   @Operation(summary = "用户数据清洗任务", description = "读取user_origin表数据,清洗后写入user_clean表")
   public String triggerUserDataCleanJob(
           @Parameter(description = "创建时间筛选条件", required = true)

           @RequestParam String createTime) {
       try {
           // 构建任务参数
           JobParameters jobParameters = new JobParametersBuilder()
                   // 时间参数(保证每次执行参数唯一,支持重复执行)
                   .addDate("executeTime", new Date())
                   // 业务参数(创建时间筛选)
                   .addString("createTime", createTime)
                   .toJobParameters();
           
           // 启动任务
           JobExecution jobExecution = jobLauncher.run(userDataCleanJob, jobParameters);
           
           // 返回执行结果
           return String.format("任务执行成功!任务ID:%s,执行状态:%s",
                   jobExecution.getJobId(), jobExecution.getStatus());
       } catch (Exception e) {
           log.error("任务执行失败", e);
           return "任务执行失败:" + e.getMessage();
       }
   }
}

4.8 事务管理实战(编程式事务)

Spring Batch默认支持声明式事务,本文按要求实现编程式事务,确保批处理数据一致性:

package com.jam.demo.service;

import com.jam.demo.entity.UserClean;
import com.jam.demo.mapper.UserCleanMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.List;

/**
* 用户数据写入服务(编程式事务)
* @author ken
*/

@Slf4j
@Service
@RequiredArgsConstructor
public class UserWriteService {
   private final UserCleanMapper userCleanMapper;
   private final TransactionTemplate transactionTemplate;
   
   /**
    * 批量写入用户清洗数据(编程式事务)
    * @param userCleanList 清洗后的数据列表
    */

   public void batchInsertUserClean(List<UserClean> userCleanList) {
       // 编程式事务:手动控制事务提交/回滚
       transactionTemplate.execute(new TransactionCallbackWithoutResult() {
           @Override
           protected void doInTransactionWithoutResult(TransactionStatus status) {
               try {
                   // 批量插入数据
                   userCleanMapper.batchInsert(userCleanList);
                   log.debug("批量插入{}条用户清洗数据成功", userCleanList.size());
               } catch (Exception e) {
                   log.error("批量插入用户清洗数据失败,触发事务回滚", e);
                   // 手动回滚事务
                   status.setRollbackOnly();
                   throw e;
               }
           }
       });
   }
}

修改ItemWriter,使用编程式事务服务:

// 替换UserItemWriterConfig中的userCleanItemWriter方法
@Bean
public ItemWriter<UserClean> userCleanItemWriter() {
   return items -> userWriteService.batchInsertUserClean(items);
}

五、进阶特性实战

5.1 分区处理(并行处理海量数据)

当数据量达到百万级以上时,单线程处理效率低下,可通过分区处理实现多线程并行处理。核心逻辑:将数据按条件分区(如按ID范围),每个分区由独立线程处理。

5.1.1 分区策略配置

package com.jam.demo.partition;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
* 按ID范围分区策略
* @author ken
*/

@Component
public class IdRangePartitioner implements Partitioner {
   /**
    * 分区方法
    * @param gridSize 分区数量
    * @param executionContext 执行上下文
    * @return 分区映射(分区名称 -> 分区参数)
    */

   @Override
   public Map<String, ExecutionContext> partition(int gridSize) {
       Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
       
       // 假设数据ID范围:1-10000,按gridSize分区
       long total = 10000;
       long partitionSize = total / gridSize;
       
       for (int i = 0; i < gridSize; i++) {
           ExecutionContext context = new ExecutionContext();
           // 计算每个分区的ID范围
           long startId = i * partitionSize + 1;
           long endId = (i == gridSize - 1) ? total : (i + 1) * partitionSize;
           
           context.putLong("startId", startId);
           context.putLong("endId", endId);
           partitions.put("partition-" + i, context);
       }
       
       return partitions;
   }
}

5.1.2 分区Step配置

@Bean
public Step partitionedUserDataCleanStep() {
   // 1. 定义从Step(每个分区执行的Step)
   Step slaveStep = stepBuilderFactory.get("slaveStep")
           .<UserOrigin, UserClean>chunk(10)
           .reader(slaveItemReader(null)) // 动态读取分区参数
           .processor(userItemProcessor)
           .writer(userCleanItemWriter)
           .build();
   
   // 2. 定义主Step(分区管理)
   return stepBuilderFactory.get("partitionedStep")
           .partitioner(slaveStep.getName(), idRangePartitioner)
           .step(slaveStep)
           .gridSize(5) // 分区数量(5个线程并行)
           .taskExecutor(new SimpleAsyncTaskExecutor()) // 异步任务执行器
           .build();
}

// 动态读取分区参数的ItemReader
@Bean
@StepScope //  StepScope:确保每个分区获取独立的Reader
public MyBatisPagingItemReader<UserOrigin> slaveItemReader(
       @Value("#{stepExecutionContext['startId']}")
Long startId,
       @Value("#{stepExecutionContext['endId']}") Long endId) {
   Map<String, Object> params = new HashMap<>(2);
   params.put("startId", startId);
   params.put("endId", endId);
   
   return new MyBatisPagingItemReaderBuilder<UserOrigin>()
           .dataSource(dataSource)
           .queryId("com.jam.demo.mapper.UserOriginMapper.selectByIdRange")
           .parameterValues(params)
           .pageSize(10)
           .rowMapper(rowMapper)
           .build();
}

5.2 重试与跳过机制

在批处理中,可能出现临时异常(如数据库连接超时),通过重试机制保证任务稳定性;对于不可恢复的异常(如数据格式错误),可配置跳过机制避免任务中断。

@Bean
public Step userDataCleanStepWithRetryAndSkip() {
   return stepBuilderFactory.get("userDataCleanStepWithRetryAndSkip")
           .<UserOrigin, UserClean>chunk(10)
           .reader(userOriginItemReader)
           .processor(userItemProcessor)
           .writer(userCleanItemWriter)
           // 重试机制:数据库异常重试3次,间隔1秒
           .faultTolerant()
           .retryLimit(3)
           .retry(SQLTransientConnectionException.class)
           .retryDelay(1000)
           // 跳过机制:数据格式异常跳过,最多跳过10条
           .skipLimit(10)
           .skip(DataFormatException.class)
           // 跳过监听器:记录跳过的数据
           .listener(new SkipListener<UserOrigin, UserClean>()
{
               @Override
               public void onSkipInRead(Throwable t) {
                   log.error("读取数据时跳过异常", t);
               }
               
               @Override
               public void onSkipInProcess(UserOrigin item, Throwable t) {
                   log.error("处理数据时跳过:{},原因:{}", item.getId(), t.getMessage());
               }
               
               @Override
               public void onSkipInWrite(UserClean item, Throwable t) {
                   log.error("写入数据时跳过:{},原因:{}", item.getUsername(), t.getMessage());
               }
           })
           .build();
}

5.3 任务监听器(监控任务生命周期)

通过监听器监控Job/Step的执行状态,实现日志记录、报警等功能:

package com.jam.demo.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

/**
* Job执行监听器
* @author ken
*/

@Slf4j
@Component
public class JobExecutionListenerImpl implements JobExecutionListener {
   /**
    * Job执行前
    * @param jobExecution 任务执行实例
    */

   @Override
   public void beforeJob(JobExecution jobExecution) {
       log.info("Job开始执行:{},参数:{}",
               jobExecution.getJobInstance().getJobName(),
               jobExecution.getJobParameters());
   }
   
   /**
    * Job执行后
    * @param jobExecution 任务执行实例
    */

   @Override
   public void afterJob(JobExecution jobExecution) {
       log.info("Job执行完成:{},状态:{},执行时间:{}ms",
               jobExecution.getJobInstance().getJobName(),
               jobExecution.getStatus(),
               jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime());
       
       // 失败报警
       if (jobExecution.getStatus().isUnsuccessful()) {
           log.error("Job执行失败,触发报警:{}", jobExecution.getFailureExceptions());
           // 此处可添加邮件/短信报警逻辑
       }
   }
}

将监听器添加到Job:

@Bean
public Job userDataCleanJob() {
   return jobBuilderFactory.get("userDataCleanJob")
           .incrementer(new RunIdIncrementer())
           .listener(jobExecutionListenerImpl) // 添加监听器
           .flow(userDataCleanStep())
           .end()
           .build();
}

六、完整实战案例测试

6.1 测试步骤

  1. 启动Spring Boot应用,访问http://localhost:8080/swagger-ui.html
  2. 在Swagger3界面中找到/batch/job/user/clean接口,输入参数createTime=2025-01-01 00:00:00
  3. 点击执行,触发批处理任务
  4. 查看日志,确认任务执行状态
  5. 查询数据库user_clean表,验证数据清洗结果

6.2 预期结果

  • 原始数据中邮箱为空的赵六,清洗后邮箱为赵六@default.com
  • 原始数据中邮箱格式错误的王五,清洗后邮箱为王五@default.com
  • 其他数据正常写入user_clean
  • 任务执行日志中记录每个步骤的执行状态、处理数据量

6.3 常见问题排查

  1. 任务无法启动:检查spring.batch.job.enabled是否为false,避免自动执行
  2. 数据读取失败:检查数据库连接配置、Mapper映射语句是否正确
  3. 事务回滚:检查数据写入逻辑是否存在异常,查看日志中的错误信息
  4. 重复执行:确保每次执行的JobParameters唯一(如添加时间参数)

七、核心技术点总结与权威依据

7.1 核心技术点总结

  1. Spring Batch核心组件:Job、Step、ItemReader/Processor/Writer、JobRepository
  2. 批处理流程:启动Job→执行Step→分块读取-处理-写入→事务提交
  3. 进阶特性:分区并行处理、重试与跳过、监听器、编程式事务
  4. 实战技巧:结合MyBatis-Plus实现数据读写,结合Swagger3实现接口触发

7.2 权威依据

  1. Spring Batch官方文档:https://docs.spring.io/spring-batch/docs/current/reference/html/
  2. Spring Boot官方文档:https://docs.spring.io/spring-boot/docs/current/reference/html/
  3. MyBatis-Plus官方文档:https://baomidou.com/pages/24112f/

八、企业级应用最佳实践

  1. 任务拆分:将复杂任务拆分为多个Step,便于维护和监控
  2. 参数化配置:通过JobParameters传递业务参数,提高任务灵活性
  3. 状态监控:利用JobRepository存储的元数据,实现任务状态可视化
  4. 资源控制:并行处理时控制线程数,避免数据库连接耗尽
  5. 失败处理:结合重试、跳过机制,配合报警功能,提高任务稳定性
  6. 性能优化:分块大小合理设置(建议10-100),避免大事务;海量数据使用分区处理

九、总结

Spring Batch作为企业级批处理的首选框架,其核心优势在于轻量级、可扩展、事务安全。本文从底层架构到实战落地,全面解析了Spring Batch的核心组件、执行流程、进阶特性,结合可直接运行的实例,覆盖了数据读取、处理、写入、事务管理、并行处理等关键场景。

通过本文的学习,你可以快速掌握企业级批处理开发能力,解决数据迁移、清洗、同步等实际问题。在实际应用中,需结合业务场景合理设计Job和Step,利用进阶特性优化性能,确保任务稳定高效执行。

目录
相关文章
|
监控 Java 数据处理
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
|
存储 自然语言处理 数据库
使用Elasticsearch映射定义索引结构
使用Elasticsearch映射定义索引结构
262 0
|
5月前
|
监控 Kubernetes Cloud Native
Spring Batch 批处理框架技术详解与实践指南
本文档全面介绍 Spring Batch 批处理框架的核心架构、关键组件和实际应用场景。作为 Spring 生态系统中专门处理大规模数据批处理的框架,Spring Batch 为企业级批处理作业提供了可靠的解决方案。本文将深入探讨其作业流程、组件模型、错误处理机制、性能优化策略以及与现代云原生环境的集成方式,帮助开发者构建高效、稳定的批处理系统。
603 1
|
19天前
|
人工智能 自然语言处理 Java
Spring AI Alibaba实战:从0到1构建企业级智能应用
本文介绍了基于SpringAI Alibaba框架开发AI原生应用的实战指南。文章首先分析了SpringAI Alibaba作为SpringAI本土化版本的核心优势,包括深度适配阿里云生态、中文语境优化等特性。随后详细讲解了开发环境的搭建过程,包括JDK17、SpringBoot3.2.2等技术栈的配置。通过三个实战案例展示了核心功能实现:基础文本生成、结合MyBatisPlus的智能问答系统、以及流式响应和函数调用等高级特性。
767 5
|
Java Maven
java项目中jar启动执行日志报错:no main manifest attribute, in /www/wwwroot/snow-server/z-server.jar-jar打包的大小明显小于正常大小如何解决
在Java项目中,启动jar包时遇到“no main manifest attribute”错误,且打包大小明显偏小。常见原因包括:1) Maven配置中跳过主程序打包;2) 缺少Manifest文件或Main-Class属性。解决方案如下:
2979 8
java项目中jar启动执行日志报错:no main manifest attribute, in /www/wwwroot/snow-server/z-server.jar-jar打包的大小明显小于正常大小如何解决
|
存储 Java 网络架构
Spring Boot中如何实现批量处理
Spring Boot中如何实现批量处理
|
Linux 网络安全 文件存储
docker中使用opwrt详解
在docker中配置opwrt
1023 5
Java系列之 IDEA 为类 和 方法设置注解模板
这篇文章介绍了如何在IntelliJ IDEA中为类和方法设置注解模板,包括类模板的创建和应用,以及两种不同的方法注解模板的创建过程和实际效果展示,旨在提高代码的可读性和维护性。
|
存储 消息中间件 JSON
DDD基础教程:一文带你读懂DDD分层架构
DDD基础教程:一文带你读懂DDD分层架构