Spring Batch 批处理框架,真心强啊!!(二)

简介: Spring Batch 批处理框架,真心强啊!!(二)

什么是ExecutionContext

ExecutionContext即每一个StepExecution 的执行环境。它包含一系列的键值对。我们可以用如下代码获取ExecutionContext

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

什么是JobRepository

JobRepository是一个用于将上述job,step等概念进行持久化的一个类。它同时给Job和Step以及下文会提到的JobLauncher实现提供CRUD操作。首次启动Job时,将从repository中获取JobExecution,并且在执行批处理的过程中,StepExecution和JobExecution将被存储到repository当中。

@EnableBatchProcessing注解可以为JobRepository提供自动配置。

什么是JobLauncher

JobLauncher这个接口的功能非常简单,它是用于启动指定了JobParameters的Job,为什么这里要强调指定了JobParameter,原因其实我们在前面已经提到了,jobparameter和job一起才能组成一次job的执行。下面是代码实例:

public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

上面run方法实现的功能是根据传入的job以及jobparamaters从JobRepository获取一个JobExecution并执行Job。

什么是Item Reader

ItemReader是一个读数据的抽象,它的功能是为每一个Step提供数据输入。当ItemReader以及读完所有数据时,它会返回null来告诉后续操作数据已经读完。Spring Batch为ItemReader提供了非常多的有用的实现类,比如JdbcPagingItemReader,JdbcCursorItemReader等等。

ItemReader支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。

下面是一个JdbcPagingItemReader的例子代码:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");
        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");
        return provider;
}

JdbcPagingItemReader必须指定一个PagingQueryProvider,负责提供SQL查询语句来按分页返回数据。

下面是一个JdbcCursorItemReader的例子代码:

private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {
        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }

什么是Item Writer

既然ItemReader是读数据的一个抽象,那么ItemWriter自然就是一个写数据的抽象,它是为每一个step提供数据写出的功能。写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个chunk的数据,关于chunk下文会有专门的介绍。ItemWriter对于读入的数据是不能做任何操作的。

Spring Batch为ItemWriter也提供了非常多的有用的实现类,当然我们也可以去实现自己的writer功能。

什么是Item Processor

ItemProcessor对项目的业务逻辑处理的一个抽象, 当ItemReader读取到一条记录之后,ItemWriter还未写入这条记录之前,I我们可以借助temProcessor提供一个处理业务逻辑的功能,并对数据进行相应操作。如果我们在ItemProcessor发现一条数据不应该被写入,可以通过返回null来表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。

chunk 处理流程

spring batch提供了让我们按照chunk处理数据的能力,一个chunk的示意图如下:

微信图片_20220907153448.png

它的意思就和图示的一样,由于我们一次batch的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高,因此spring batch提供了chunk这个概念,我们可以设定一个chunk size,spring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到chunk size设定的值得时候,才一起去commit.

java的实例定义代码如下:

微信图片_20220907153545.png

在上面这个step里面,chunk size被设为了10,当ItemReader读的数据数量达到10的时候,这一批次的数据就一起被传到itemWriter,同时transaction被提交。

skip策略和失败处理

一个batch的job的step,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。spring batch当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下bean的配置:

微信图片_20220907153603.png

我们需要留意这三个方法,分别是skipLimit(),skip(),noSkip(),

skipLimit方法的意思是我们可以设定一个我们允许的这个step可以跳过的异常数量,假如我们设定为10,则当这个step运行时,只要出现的异常数目不超过10,整个step都不会fail。注意,若不设定skipLimit,则其默认值是0.

skip方法我们可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。

noSkip方法的意思则是指出现这个异常我们不想跳过,也就是从skip的所以exception当中排除这个exception,从上面的例子来说,也就是跳过所有除FileNotFoundException的exception。那么对于这个step来说,FileNotFoundException就是一个fatal的exception,抛出这个exception的时候step就会直接fail

批处理操作指南

本部分是一些使用spring batch时的值得注意的点

批处理原则

在构建批处理解决方案时,应考虑以下关键原则和注意事项。

  • 批处理体系结构通常会影响体系结构
  • 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
  • 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。
  • 最大限度地减少系统资源的使用,尤其是I / O. 在internal memory中执行尽可能多的操作。
  • 查看应用程序I / O(分析SQL语句)以确保避免不必要的物理I / O. 特别是,需要寻找以下四个常见缺陷:
  1. 当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据。
  2. 重新读取先前在同一事务中读取数据的事务的数据。
  3. 导致不必要的表或索引扫描。
  4. 未在SQL语句的WHERE子句中指定键值。
  • 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据。
  • 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配。
  • 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
  • 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
  • 在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
  • 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以24-7在线的情况运行。数据库备份通常在在线设计中得到很好的处理,但文件备份应该被视为同样重要。如果系统依赖于文件,则文件备份过程不仅应该到位并记录在案,还应定期进行测试。

如何默认不启动job

在使用java config使用spring batch的job时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理job。那么如何让项目在启动时不自动去跑job呢?

spring batch的job会在项目启动时自动run,如果我们不想让他在启动时run的话,可以在application.properties中添加如下属性:

spring.batch.job.enabled=false

在读数据时内存不够

在使用spring batch做数据迁移时,发现在job启动后,执行到一定时间点时就卡在一个地方不动了,且log也不再打印,等待一段时间之后,得到如下错误:

微信图片_20220907153633.png

红字的信息为:Resource exhaustion event:the JVM was unable to allocate memory from the heap.

翻译过来的意思就是项目发出了一个资源耗尽的事件,告诉我们java虚拟机无法再为堆分配内存。

造成这个错误的原因是: 这个项目里的batch job的reader是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。解决的办法有两个:

  • 调整reader读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
  • 增大service内存
相关文章
|
2月前
|
安全 Java Ruby
我尝试了所有后端框架 — — 这就是为什么只有 Spring Boot 幸存下来
作者回顾后端开发历程,指出多数框架在生产环境中难堪重负。相比之下,Spring Boot凭借内置安全、稳定扩展、完善生态和企业级支持,成为构建高可用系统的首选,真正经受住了时间与规模的考验。
235 2
|
1月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
3月前
|
XML JSON Java
Spring框架中常见注解的使用规则与最佳实践
本文介绍了Spring框架中常见注解的使用规则与最佳实践,重点对比了URL参数与表单参数的区别,并详细说明了@RequestParam、@PathVariable、@RequestBody等注解的应用场景。同时通过表格和案例分析,帮助开发者正确选择参数绑定方式,避免常见误区,提升代码的可读性与安全性。
|
1月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
115 8
|
2月前
|
监控 Kubernetes Cloud Native
Spring Batch 批处理框架技术详解与实践指南
本文档全面介绍 Spring Batch 批处理框架的核心架构、关键组件和实际应用场景。作为 Spring 生态系统中专门处理大规模数据批处理的框架,Spring Batch 为企业级批处理作业提供了可靠的解决方案。本文将深入探讨其作业流程、组件模型、错误处理机制、性能优化策略以及与现代云原生环境的集成方式,帮助开发者构建高效、稳定的批处理系统。
339 1
|
4月前
|
安全 Java 微服务
Java 最新技术和框架实操:涵盖 JDK 21 新特性与 Spring Security 6.x 安全框架搭建
本文系统整理了Java最新技术与主流框架实操内容,涵盖Java 17+新特性(如模式匹配、文本块、记录类)、Spring Boot 3微服务开发、响应式编程(WebFlux)、容器化部署(Docker+K8s)、测试与CI/CD实践,附完整代码示例和学习资源推荐,助你构建现代Java全栈开发能力。
504 1
|
3月前
|
Cloud Native Java API
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
631 0
|
Java Spring
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
263 0
|
缓存 监控 Java
Spring框架之AOP(面向切面编程)
Spring框架之AOP(面向切面编程)
164 0
|
XML 设计模式 安全
【Spring框架四】——Spring AOP 注解实现和xml方式实现1
【Spring框架四】——Spring AOP 注解实现和xml方式实现
253 0