在企业级应用开发中,批处理场景无处不在——数据迁移、报表生成、日志分析、数据清洗同步等。这些场景的核心需求是高效处理海量数据、保证数据一致性、支持失败重试与断点续跑。Spring Batch作为Spring生态专为批处理设计的框架,凭借其轻量级、可扩展、事务安全的特性,成为企业批处理方案的首选。本文将从底层逻辑到实战落地,全面拆解Spring Batch,结合可直接运行的实例,让你快速掌握企业级批处理开发能力。
一、批处理核心概念与Spring Batch优势
1.1 批处理的核心特征
批处理是指批量处理大量数据的无交互式操作,典型特征包括:
- 无交互性:无需人工干预,自动完成数据处理
- 海量数据:单次处理数据量从万级到亿级不等
- 可靠性:支持失败重试、断点续跑,避免数据丢失或重复
- 高效性:支持并行处理、分区处理,提升处理速度
- 事务性:保证批量操作的原子性,要么全部成功,要么全部回滚
1.2 Spring Batch的核心优势
Spring Batch基于Spring框架开发,继承了Spring的依赖注入、AOP等特性,同时针对批处理场景做了深度优化,核心优势如下:
- 轻量化:核心依赖小,易于集成到Spring Boot项目
- 可扩展性:支持自定义ItemReader/Processor/Writer、自定义监听器等,适配复杂场景
- 事务安全:内置事务管理机制,保证批处理数据一致性
- 丰富的组件:提供开箱即用的Reader(数据库、文件、消息队列)、Writer,减少重复开发
- 监控与运维:支持批处理任务的状态跟踪、日志记录、失败报警
- 并行与分区:支持多线程并行处理、多节点分区处理,应对海量数据
- 兼容性:兼容Spring Boot、MyBatis-Plus、MySQL等主流技术栈
1.3 Spring Batch与其他批处理方案对比
| 方案 | 优势 | 劣势 | 适用场景 |
| Spring Batch | 生态完善、可扩展强、事务安全 | 需额外配置,学习成本中等 | 企业级复杂批处理场景 |
| 原生Java线程 | 实现简单、轻量 | 无事务管理、无重试机制 | 简单小规模批处理 |
| Quartz | 定时能力强 | 批处理核心能力薄弱 | 定时触发场景,需配合批处理框架 |
| Flink/Spark | 分布式处理能力强 | 重量级、部署复杂 | 超大规模数据批处理/流批一体 |
二、Spring Batch核心架构与底层逻辑
2.1 核心架构图
2.2 核心组件职责拆解
1. JobLauncher:任务启动器
- 核心职责:接收任务启动请求,触发Job执行
- 底层逻辑:通过JobRepository获取Job的配置信息,创建JobExecution执行实例,触发Step执行
- 适用场景:程序调用、定时任务触发(结合Quartz/Spring Scheduler)
2. Job:批处理任务
- 核心职责:定义批处理任务的整体流程,包含一个或多个Step
- 底层逻辑:Job是逻辑上的任务单元,通过JobParameters传递参数,JobExecution记录执行状态(成功/失败/运行中)
- 关键特性:支持重启(restartable)、参数校验、任务依赖
3. Step:任务步骤
- 核心职责:批处理的最小执行单元,包含数据读取(Reader)、处理(Processor)、写入(Writer)的完整流程
- 底层逻辑:Step通过Chunk(块)处理数据,即读取一定数量的记录后批量处理并写入,减少IO开销;支持事务管理、重试、跳过
- 两种执行模式:
- Chunk-oriented:适用于海量数据,分块处理
- Tasklet-oriented:适用于简单任务(如创建目录、执行SQL)
4. ItemReader/ItemProcessor/ItemWriter:数据处理三组件
- ItemReader:数据读取组件,从数据源(数据库、文件、CSV、Excel、消息队列)读取数据
- ItemProcessor:数据处理组件,实现数据清洗、转换、过滤、校验等逻辑
- ItemWriter:数据写入组件,将处理后的数据写入目标数据源(数据库、文件、缓存)
5. JobRepository:任务仓库
- 核心职责:持久化存储Job、Step的执行状态、参数、结果等信息
- 底层逻辑:通过数据库表(如BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION)存储元数据,支持断点续跑、状态监控
- 支持存储:MySQL、Oracle、PostgreSQL等关系型数据库,也支持内存(仅测试用)
6. TransactionManager:事务管理器
- 核心职责:保证Step处理的事务一致性,支持Chunk级别的事务提交
- 底层逻辑:默认绑定Spring的PlatformTransactionManager,每个Chunk的处理作为一个事务,失败则回滚整个Chunk
2.3 批处理执行流程(底层逻辑)
三、环境搭建(基于JDK17+Spring Boot3+Maven)
3.1 核心依赖(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<groupId>com.jam.demo</groupId>
<artifactId>spring-batch-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-demo</name>
<description>Spring Batch 实战演示项目</description>
<properties>
<java.version>17</java.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<fastjson2.version>2.0.49</fastjson2.version>
<lombok.version>1.18.30</lombok.version>
</properties>
<dependencies>
<!-- Spring Boot 核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Batch 核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- 数据库依赖 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MyBatis-Plus 依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- Lombok 依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- FastJSON2 依赖 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- Spring Web 依赖(用于接口触发任务) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Swagger3 依赖 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.3.0</version>
</dependency>
<!-- Google Guava 集合工具 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.2 配置文件(application.yml)
spring:
# 数据源配置
datasource:
url: jdbc:mysql://localhost:3306/spring_batch_demo?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
# Spring Batch 配置
batch:
# 自动执行开关(默认true,测试时可关闭)
job:
enabled: false
# 数据库初始化模式(默认EMBEDDED,生产环境用ALWAYS)
jdbc:
initialize-schema: ALWAYS
# MyBatis-Plus 配置
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.jam.demo.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 日志配置
logging:
level:
root: info
com.jam.demo: debug
org.springframework.batch: info
# 服务器配置
server:
port: 8080
# Swagger3 配置
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
3.3 数据库初始化(MySQL8.0)
Spring Batch需要初始化元数据表存储任务状态,通过spring.batch.jdbc.initialize-schema: ALWAYS自动初始化,核心表说明:
- BATCH_JOB_INSTANCE:任务实例表,存储Job的唯一实例
- BATCH_JOB_EXECUTION:任务执行表,存储Job的每次执行记录
- BATCH_STEP_EXECUTION:步骤执行表,存储Step的每次执行记录
- BATCH_JOB_PARAMS:任务参数表,存储Job执行时的参数
同时创建业务表(本文实战案例用:用户原始表、用户清洗表):
-- 用户原始表(待处理数据)
CREATE TABLE `user_origin` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`username` varchar(50) NOT NULL COMMENT '用户名',
`phone` varchar(20) DEFAULT NULL COMMENT '手机号',
`email` varchar(100) DEFAULT NULL COMMENT '邮箱',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户原始表';
-- 用户清洗表(处理后数据)
CREATE TABLE `user_clean` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`username` varchar(50) NOT NULL COMMENT '用户名',
`phone` varchar(20) NOT NULL COMMENT '手机号(清洗后)',
`email` varchar(100) NOT NULL COMMENT '邮箱(清洗后)',
`process_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '处理时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户清洗表';
-- 插入测试数据
INSERT INTO `user_origin` (`username`, `phone`, `email`, `create_time`)
VALUES
('张三', '13800138000', 'zhangsan@test.com', '2025-01-01 10:00:00'),
('李四', '13900139000', 'lisi@test.com', '2025-01-01 10:00:00'),
('王五', '13700137000', 'wangwu#test.com', '2025-01-01 10:00:00'), -- 邮箱格式错误
('赵六', '13600136000', '', '2025-01-01 10:00:00'), -- 邮箱为空
('孙七', '13500135000', 'sunqi@test.com', '2025-01-01 10:00:00');
四、核心组件实战(从基础到进阶)
4.1 实体类定义
4.1.1 UserOrigin(用户原始表实体)
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户原始表实体
* @author ken
*/
@Data
@TableName("user_origin")
public class UserOrigin {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户名
*/
private String username;
/**
* 手机号
*/
private String phone;
/**
* 邮箱
*/
private String email;
/**
* 创建时间
*/
private LocalDateTime createTime;
}
4.1.2 UserClean(用户清洗表实体)
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户清洗表实体
* @author ken
*/
@Data
@TableName("user_clean")
public class UserClean {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户名
*/
private String username;
/**
* 手机号(清洗后)
*/
private String phone;
/**
* 邮箱(清洗后)
*/
private String email;
/**
* 处理时间
*/
private LocalDateTime processTime;
}
4.2 Mapper层(MyBatis-Plus)
4.2.1 UserOriginMapper
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.UserOrigin;
import org.springframework.stereotype.Repository;
/**
* 用户原始表Mapper
* @author ken
*/
@Repository
public interface UserOriginMapper extends BaseMapper<UserOrigin> {
}
4.2.2 UserCleanMapper
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.UserClean;
import org.springframework.stereotype.Repository;
/**
* 用户清洗表Mapper
* @author ken
*/
@Repository
public interface UserCleanMapper extends BaseMapper<UserClean> {
}
4.3 ItemReader实战(数据读取)
ItemReader负责从数据源读取数据,Spring Batch提供多种开箱即用的实现:
- JdbcCursorItemReader:数据库游标读取(适用于大数据量)
- JdbcPagingItemReader:数据库分页读取
- FlatFileItemReader:文件(CSV/Excel)读取
- MyBatisPagingItemReader:MyBatis分页读取
本文使用MyBatisPagingItemReader(适配MyBatis-Plus,支持分页读取数据库数据):
package com.jam.demo.reader;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.UserOrigin;
import com.jam.demo.mapper.UserOriginMapper;
import lombok.RequiredArgsConstructor;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 用户数据读取器配置
* @author ken
*/
@Configuration
@RequiredArgsConstructor
public class UserItemReaderConfig {
private final UserOriginMapper userOriginMapper;
/**
* 构建MyBatis分页读取器
* @return MyBatisPagingItemReader<UserOrigin>
*/
@Bean
public MyBatisPagingItemReader<UserOrigin> userOriginItemReader() {
// 构建查询参数
Map<String, Object> parameterValues = new HashMap<>(1);
parameterValues.put("createTime", "2025-01-01 00:00:00");
return new MyBatisPagingItemReaderBuilder<UserOrigin>()
// 设置数据源(自动关联Spring数据源)
.dataSource(userOriginMapper.getBaseMapper().getSqlSession().getConnection().getDataSource())
// 设置查询语句(MyBatis映射语句ID)
.queryId("com.jam.demo.mapper.UserOriginMapper.selectByCreateTime")
// 设置参数
.parameterValues(parameterValues)
// 设置分页大小(每次读取10条)
.pageSize(10)
// 设置结果集映射
.rowMapper((resultSet, i) -> {
UserOrigin userOrigin = new UserOrigin();
userOrigin.setId(resultSet.getLong("id"));
userOrigin.setUsername(resultSet.getString("username"));
userOrigin.setPhone(resultSet.getString("phone"));
userOrigin.setEmail(resultSet.getString("email"));
userOrigin.setCreateTime(resultSet.getTimestamp("createTime").toLocalDateTime());
return userOrigin;
})
.build();
}
/**
* MyBatis映射语句(也可写在XML中)
* 注:实际开发中建议放在UserOriginMapper.xml中
*/
// 对应XML内容:
/*
<select id="selectByCreateTime" parameterType="java.util.Map" resultType="com.jam.demo.entity.UserOrigin">
SELECT id, username, phone, email, create_time FROM user_origin
WHERE create_time >= #{createTime}
ORDER BY id ASC
</select>
*/
}
4.4 ItemProcessor实战(数据处理)
ItemProcessor负责数据清洗、转换、过滤,核心逻辑:
- 过滤无效数据(如邮箱为空)
- 清洗错误数据(如邮箱格式错误)
- 数据转换(如统一格式)
package com.jam.demo.processor;
import com.jam.demo.entity.UserClean;
import com.jam.demo.entity.UserOrigin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.regex.Pattern;
/**
* 用户数据处理器
* 负责数据清洗、过滤、转换
* @author ken
*/
@Slf4j
@Component
public class UserItemProcessor implements ItemProcessor<UserOrigin, UserClean> {
/**
* 邮箱正则表达式
*/
private static final Pattern EMAIL_PATTERN = Pattern.compile("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$");
/**
* 处理数据
* @param item 原始数据
* @return 清洗后的数据(返回null则过滤该数据)
* @throws Exception 处理异常
*/
@Override
public UserClean process(UserOrigin item) throws Exception {
log.debug("开始处理用户数据:{}", item.getUsername());
// 1. 过滤无效数据(手机号为空或用户名为空)
if (StringUtils.isEmpty(item.getPhone()) || StringUtils.isEmpty(item.getUsername())) {
log.warn("用户数据无效(手机号/用户名为空),跳过:{}", item.getId());
return null;
}
// 2. 清洗邮箱数据
String email = item.getEmail();
if (StringUtils.isEmpty(email) || !EMAIL_PATTERN.matcher(email).matches()) {
log.warn("邮箱格式错误,使用默认邮箱:{}", item.getId());
email = item.getUsername() + "@default.com";
}
// 3. 转换为清洗后实体
UserClean userClean = new UserClean();
userClean.setUsername(item.getUsername());
userClean.setPhone(item.getPhone());
userClean.setEmail(email);
log.debug("用户数据处理完成:{}", userClean.getUsername());
return userClean;
}
}
4.5 ItemWriter实战(数据写入)
ItemWriter负责将处理后的数据写入目标数据源,本文使用MyBatisBatchItemWriter(批量写入数据库):
package com.jam.demo.writer;
import com.jam.demo.entity.UserClean;
import com.jam.demo.mapper.UserCleanMapper;
import lombok.RequiredArgsConstructor;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 用户数据写入器配置
* @author ken
*/
@Configuration
@RequiredArgsConstructor
public class UserItemWriterConfig {
private final UserCleanMapper userCleanMapper;
/**
* 构建MyBatis批量写入器
* @return MyBatisBatchItemWriter<UserClean>
*/
@Bean
public MyBatisBatchItemWriter<UserClean> userCleanItemWriter() {
return new MyBatisBatchItemWriterBuilder<UserClean>()
// 设置SqlSessionFactory(自动关联MyBatis-Plus的SqlSessionFactory)
.sqlSessionFactory(userCleanMapper.getBaseMapper().getSqlSession().getConfiguration().getEnvironment().getSqlSessionFactory())
// 设置插入语句ID(MyBatis映射语句ID)
.statementId("com.jam.demo.mapper.UserCleanMapper.insert")
.build();
}
}
4.6 Job与Step配置(核心流程定义)
package com.jam.demo.config;
import com.jam.demo.entity.UserClean;
import com.jam.demo.entity.UserOrigin;
import com.jam.demo.processor.UserItemProcessor;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
/**
* Spring Batch 核心配置(Job与Step定义)
* @author ken
*/
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchCoreConfig {
/**
* Job构建工厂
*/
private final JobBuilderFactory jobBuilderFactory;
/**
* Step构建工厂
*/
private final StepBuilderFactory stepBuilderFactory;
/**
* 事务管理器
*/
private final PlatformTransactionManager platformTransactionManager;
/**
* 数据读取器
*/
private final ItemReader<UserOrigin> userOriginItemReader;
/**
* 数据处理器
*/
private final ItemProcessor<UserOrigin, UserClean> userItemProcessor;
/**
* 数据写入器
*/
private final ItemWriter<UserClean> userCleanItemWriter;
/**
* 定义用户数据清洗Job
* @return Job
*/
@Bean
public Job userDataCleanJob() {
return jobBuilderFactory.get("userDataCleanJob")
// 增量器(保证每次执行Job的ID唯一)
.incrementer(new RunIdIncrementer())
// 关联Step(一个Job可包含多个Step,按顺序执行)
.flow(userDataCleanStep())
// 结束Job
.end()
// 构建Job
.build();
}
/**
* 定义用户数据清洗Step
* @return Step
*/
@Bean
public Step userDataCleanStep() {
return stepBuilderFactory.get("userDataCleanStep")
// 配置数据类型(输入类型、输出类型)
.<UserOrigin, UserClean>chunk(10)
// 设置事务管理器
.transactionManager(platformTransactionManager)
// 配置读取器
.reader(userOriginItemReader)
// 配置处理器
.processor(userItemProcessor)
// 配置写入器
.writer(userCleanItemWriter)
// 配置重试机制(异常重试3次,间隔1秒)
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.build();
}
}
4.7 JobLauncher实战(任务触发)
通过Spring Web接口触发Job执行,结合Swagger3提供接口文档:
package com.jam.demo.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 批处理任务触发控制器
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/batch/job")
@RequiredArgsConstructor
@Tag(name = "批处理任务接口", description = "用于触发Spring Batch任务执行")
public class BatchJobController {
/**
* 任务启动器
*/
private final JobLauncher jobLauncher;
/**
* 用户数据清洗任务
*/
private final Job userDataCleanJob;
/**
* 触发用户数据清洗任务
* @param createTime 筛选条件:创建时间(格式:yyyy-MM-dd HH:mm:ss)
* @return 任务执行结果
*/
@PostMapping("/user/clean")
@Operation(summary = "用户数据清洗任务", description = "读取user_origin表数据,清洗后写入user_clean表")
public String triggerUserDataCleanJob(
@Parameter(description = "创建时间筛选条件", required = true)
@RequestParam String createTime) {
try {
// 构建任务参数
JobParameters jobParameters = new JobParametersBuilder()
// 时间参数(保证每次执行参数唯一,支持重复执行)
.addDate("executeTime", new Date())
// 业务参数(创建时间筛选)
.addString("createTime", createTime)
.toJobParameters();
// 启动任务
JobExecution jobExecution = jobLauncher.run(userDataCleanJob, jobParameters);
// 返回执行结果
return String.format("任务执行成功!任务ID:%s,执行状态:%s",
jobExecution.getJobId(), jobExecution.getStatus());
} catch (Exception e) {
log.error("任务执行失败", e);
return "任务执行失败:" + e.getMessage();
}
}
}
4.8 事务管理实战(编程式事务)
Spring Batch默认支持声明式事务,本文按要求实现编程式事务,确保批处理数据一致性:
package com.jam.demo.service;
import com.jam.demo.entity.UserClean;
import com.jam.demo.mapper.UserCleanMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
/**
* 用户数据写入服务(编程式事务)
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class UserWriteService {
private final UserCleanMapper userCleanMapper;
private final TransactionTemplate transactionTemplate;
/**
* 批量写入用户清洗数据(编程式事务)
* @param userCleanList 清洗后的数据列表
*/
public void batchInsertUserClean(List<UserClean> userCleanList) {
// 编程式事务:手动控制事务提交/回滚
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
// 批量插入数据
userCleanMapper.batchInsert(userCleanList);
log.debug("批量插入{}条用户清洗数据成功", userCleanList.size());
} catch (Exception e) {
log.error("批量插入用户清洗数据失败,触发事务回滚", e);
// 手动回滚事务
status.setRollbackOnly();
throw e;
}
}
});
}
}
修改ItemWriter,使用编程式事务服务:
// 替换UserItemWriterConfig中的userCleanItemWriter方法
@Bean
public ItemWriter<UserClean> userCleanItemWriter() {
return items -> userWriteService.batchInsertUserClean(items);
}
五、进阶特性实战
5.1 分区处理(并行处理海量数据)
当数据量达到百万级以上时,单线程处理效率低下,可通过分区处理实现多线程并行处理。核心逻辑:将数据按条件分区(如按ID范围),每个分区由独立线程处理。
5.1.1 分区策略配置
package com.jam.demo.partition;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 按ID范围分区策略
* @author ken
*/
@Component
public class IdRangePartitioner implements Partitioner {
/**
* 分区方法
* @param gridSize 分区数量
* @param executionContext 执行上下文
* @return 分区映射(分区名称 -> 分区参数)
*/
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
// 假设数据ID范围:1-10000,按gridSize分区
long total = 10000;
long partitionSize = total / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
// 计算每个分区的ID范围
long startId = i * partitionSize + 1;
long endId = (i == gridSize - 1) ? total : (i + 1) * partitionSize;
context.putLong("startId", startId);
context.putLong("endId", endId);
partitions.put("partition-" + i, context);
}
return partitions;
}
}
5.1.2 分区Step配置
@Bean
public Step partitionedUserDataCleanStep() {
// 1. 定义从Step(每个分区执行的Step)
Step slaveStep = stepBuilderFactory.get("slaveStep")
.<UserOrigin, UserClean>chunk(10)
.reader(slaveItemReader(null)) // 动态读取分区参数
.processor(userItemProcessor)
.writer(userCleanItemWriter)
.build();
// 2. 定义主Step(分区管理)
return stepBuilderFactory.get("partitionedStep")
.partitioner(slaveStep.getName(), idRangePartitioner)
.step(slaveStep)
.gridSize(5) // 分区数量(5个线程并行)
.taskExecutor(new SimpleAsyncTaskExecutor()) // 异步任务执行器
.build();
}
// 动态读取分区参数的ItemReader
@Bean
@StepScope // StepScope:确保每个分区获取独立的Reader
public MyBatisPagingItemReader<UserOrigin> slaveItemReader(
@Value("#{stepExecutionContext['startId']}") Long startId,
@Value("#{stepExecutionContext['endId']}") Long endId) {
Map<String, Object> params = new HashMap<>(2);
params.put("startId", startId);
params.put("endId", endId);
return new MyBatisPagingItemReaderBuilder<UserOrigin>()
.dataSource(dataSource)
.queryId("com.jam.demo.mapper.UserOriginMapper.selectByIdRange")
.parameterValues(params)
.pageSize(10)
.rowMapper(rowMapper)
.build();
}
5.2 重试与跳过机制
在批处理中,可能出现临时异常(如数据库连接超时),通过重试机制保证任务稳定性;对于不可恢复的异常(如数据格式错误),可配置跳过机制避免任务中断。
@Bean
public Step userDataCleanStepWithRetryAndSkip() {
return stepBuilderFactory.get("userDataCleanStepWithRetryAndSkip")
.<UserOrigin, UserClean>chunk(10)
.reader(userOriginItemReader)
.processor(userItemProcessor)
.writer(userCleanItemWriter)
// 重试机制:数据库异常重试3次,间隔1秒
.faultTolerant()
.retryLimit(3)
.retry(SQLTransientConnectionException.class)
.retryDelay(1000)
// 跳过机制:数据格式异常跳过,最多跳过10条
.skipLimit(10)
.skip(DataFormatException.class)
// 跳过监听器:记录跳过的数据
.listener(new SkipListener<UserOrigin, UserClean>() {
@Override
public void onSkipInRead(Throwable t) {
log.error("读取数据时跳过异常", t);
}
@Override
public void onSkipInProcess(UserOrigin item, Throwable t) {
log.error("处理数据时跳过:{},原因:{}", item.getId(), t.getMessage());
}
@Override
public void onSkipInWrite(UserClean item, Throwable t) {
log.error("写入数据时跳过:{},原因:{}", item.getUsername(), t.getMessage());
}
})
.build();
}
5.3 任务监听器(监控任务生命周期)
通过监听器监控Job/Step的执行状态,实现日志记录、报警等功能:
package com.jam.demo.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
/**
* Job执行监听器
* @author ken
*/
@Slf4j
@Component
public class JobExecutionListenerImpl implements JobExecutionListener {
/**
* Job执行前
* @param jobExecution 任务执行实例
*/
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("Job开始执行:{},参数:{}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getJobParameters());
}
/**
* Job执行后
* @param jobExecution 任务执行实例
*/
@Override
public void afterJob(JobExecution jobExecution) {
log.info("Job执行完成:{},状态:{},执行时间:{}ms",
jobExecution.getJobInstance().getJobName(),
jobExecution.getStatus(),
jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime());
// 失败报警
if (jobExecution.getStatus().isUnsuccessful()) {
log.error("Job执行失败,触发报警:{}", jobExecution.getFailureExceptions());
// 此处可添加邮件/短信报警逻辑
}
}
}
将监听器添加到Job:
@Bean
public Job userDataCleanJob() {
return jobBuilderFactory.get("userDataCleanJob")
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListenerImpl) // 添加监听器
.flow(userDataCleanStep())
.end()
.build();
}
六、完整实战案例测试
6.1 测试步骤
- 启动Spring Boot应用,访问http://localhost:8080/swagger-ui.html
- 在Swagger3界面中找到
/batch/job/user/clean接口,输入参数createTime=2025-01-01 00:00:00 - 点击执行,触发批处理任务
- 查看日志,确认任务执行状态
- 查询数据库
user_clean表,验证数据清洗结果
6.2 预期结果
- 原始数据中邮箱为空的赵六,清洗后邮箱为
赵六@default.com - 原始数据中邮箱格式错误的王五,清洗后邮箱为
王五@default.com - 其他数据正常写入
user_clean表 - 任务执行日志中记录每个步骤的执行状态、处理数据量
6.3 常见问题排查
- 任务无法启动:检查
spring.batch.job.enabled是否为false,避免自动执行 - 数据读取失败:检查数据库连接配置、Mapper映射语句是否正确
- 事务回滚:检查数据写入逻辑是否存在异常,查看日志中的错误信息
- 重复执行:确保每次执行的
JobParameters唯一(如添加时间参数)
七、核心技术点总结与权威依据
7.1 核心技术点总结
- Spring Batch核心组件:Job、Step、ItemReader/Processor/Writer、JobRepository
- 批处理流程:启动Job→执行Step→分块读取-处理-写入→事务提交
- 进阶特性:分区并行处理、重试与跳过、监听器、编程式事务
- 实战技巧:结合MyBatis-Plus实现数据读写,结合Swagger3实现接口触发
7.2 权威依据
- Spring Batch官方文档:https://docs.spring.io/spring-batch/docs/current/reference/html/
- Spring Boot官方文档:https://docs.spring.io/spring-boot/docs/current/reference/html/
- MyBatis-Plus官方文档:https://baomidou.com/pages/24112f/
八、企业级应用最佳实践
- 任务拆分:将复杂任务拆分为多个Step,便于维护和监控
- 参数化配置:通过
JobParameters传递业务参数,提高任务灵活性 - 状态监控:利用JobRepository存储的元数据,实现任务状态可视化
- 资源控制:并行处理时控制线程数,避免数据库连接耗尽
- 失败处理:结合重试、跳过机制,配合报警功能,提高任务稳定性
- 性能优化:分块大小合理设置(建议10-100),避免大事务;海量数据使用分区处理
九、总结
Spring Batch作为企业级批处理的首选框架,其核心优势在于轻量级、可扩展、事务安全。本文从底层架构到实战落地,全面解析了Spring Batch的核心组件、执行流程、进阶特性,结合可直接运行的实例,覆盖了数据读取、处理、写入、事务管理、并行处理等关键场景。
通过本文的学习,你可以快速掌握企业级批处理开发能力,解决数据迁移、清洗、同步等实际问题。在实际应用中,需结合业务场景合理设计Job和Step,利用进阶特性优化性能,确保任务稳定高效执行。