Spring Batch 小任务(Tasklet)步骤

简介:

Chunk-Oriented Processing不是处理 step 的唯一方法。

考虑下面的一个场景,如果你仅仅需要调用一个存储过程,你可以在 ItemReader 中实现这个调用,然后在存储过程完成调用后返回 null。这种设计看起来不是那么自然也不是非常优美,因为你的批量设计中甚至都不需要实现 ItemWriter。针对这种情况,Spring Batch 为你提供了 TaskletStep 选项。

TaskletStep 是一个简单的接口,这个接口只需要实现一个方法execute,这个方法将会被TaskletStep多次重复的调用,直到这个方法返回 RepeatStatus.FINISHED 或者抛出异常来表示调用失败。

Tasklet 的每一次调用都会包含在事务中(Transaction)。Tasklet 的实现(implementors)可以调用一个存储过程,一个脚本或者一个简单的 SQL 更新脚本。

针对我们的实践中,我们可以使用 Tasklet 来执行一个 FTP 的任务。

将我们产生的中间文件上传到不同的 FTP 服务器上,你可以在实现中指定不同的服务器配置参数,这样更加有利于代码的重用。

为了能够创建一个 TaskletStep,Bean 需要传递一个 tasklet 方法到构造器(builder),这个 tasklet 方法需要实现 Tasklet 接口。

当你构建 TaskletStep 的时候不要调用 chunk。

下面的示例代码显示了一个在 Step build 中构建一个简单的 tasklet。

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                .tasklet(myTasklet())
                .build();
}

如果你的 tasklet 实现了 StepListener  接口的话,TaskletStep 将会自动将 tasklet 注册成为一个 StepListener。

TaskletAdapter

与 ItemReader 和 ItemWriter 接口的 adapters一样。Tasklet  接口包含的实现也允许能够通过已经存在的类使用 TaskletAdapter 来将自己进行注册。

例如,你希望使用一个已经存在的 DAO 来更新记录集上的标记的时候,你可以使用 TaskletAdapter 来进行实现。

使用 TaskletAdapter  能够让你的 DAO 可以被 Spring Batch 的 TaskletStep 调用而不需要让你的 DAO 都实现 Tasklet 的接口。

如下面的示例代码:

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
    MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

    adapter.setTargetObject(fooDao());
    adapter.setTargetMethod("updateFoo");

    return adapter;
}
Tasklet 实现(Implementation)示例

在主批量作业开始之前,可能需要很多其他的批量作业必须完成,这样以便于主批量作业能够获得必要的资源和在完成后释放资源或者进行清理。

例如我们遇到下面的使用场景,一个批量作业需要大量的对文件进行交互和使用,通常来说需要在文件被上传到其他服务器上后删除本地产生的临时文件。

下面的示例就是一个 Tasklet 的实现,这个Tasklet 的实现能够完成上面的交互要求(文件来自 Spring Batch samples project 示例程序)。

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

 

Tasklet 处理程序实现了将给定目录中的所有文件进行删除。我们应该通知 execute  方法,这个 Tasklet 应该只被执行一次。

所有相关执行的操作需要在 Step 中进行设置,请参考下面有关这个 Tasklet 的设置:

Java 配置

@Bean
public Job taskletJob() {
    return this.jobBuilderFactory.get("taskletJob")
                .start(deleteFilesInDir())
                .build();
}

@Bean
public Step deleteFilesInDir() {
    return this.stepBuilderFactory.get("deleteFilesInDir")
                .tasklet(fileDeletingTasklet())
                .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
    FileDeletingTasklet tasklet = new FileDeletingTasklet();

    tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

    return tasklet;
}

https://www.cwiki.us/display/SpringBatchZH/TaskletStep

目录
相关文章
|
2月前
|
JSON 安全 算法
|
2月前
|
Java API Spring
在 Spring 配置文件中配置 Filter 的步骤
【10月更文挑战第21天】在 Spring 配置文件中配置 Filter 是实现请求过滤的重要手段。通过合理的配置,可以灵活地对请求进行处理,满足各种应用需求。还可以根据具体的项目要求和实际情况,进一步深入研究和优化 Filter 的配置,以提高应用的性能和安全性。
|
4月前
|
缓存 NoSQL Java
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
|
5月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 Java BI
使用Java和Spring Batch实现批处理
使用Java和Spring Batch实现批处理
|
6月前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
296 7
|
6月前
|
算法 Java API
在Spring Boot中实现接口签名验证通常涉及以下步骤
在Spring Boot中实现接口签名验证通常涉及以下步骤
494 4
|
6月前
|
前端开发 安全 Java
实现Spring Boot中的文件分片上传通常涉及到以下几个步骤和考虑的关键点
实现Spring Boot中的文件分片上传通常涉及到以下几个步骤和考虑的关键点
333 2
|
6月前
|
Java 数据处理 数据库
Java一分钟之-Spring Batch:批量处理框架
【6月更文挑战第11天】Spring Batch是Spring家族的批处理框架,简化了批量处理任务的开发。它包含Job、Step、ItemReader、ItemProcessor和ItemWriter等核心组件,用于构建数据处理流程。本文讨论了批量处理中的常见问题,如内存溢出、事务管理和异常处理,并提供了相应的解决策略。通过添加相关依赖、定义Job和Steps,以及启动Job的示例代码,帮助开发者开始使用Spring Batch。了解其核心概念和最佳实践,能提升批量处理系统的效率和可靠性。
147 4
|
6月前
|
消息中间件 Java 持续交付
Spring Cloud Alibaba 项目搭建步骤和注意事项
Spring Cloud Alibaba 项目搭建步骤和注意事项
818 0
Spring Cloud Alibaba 项目搭建步骤和注意事项