Spring 官方批处理框架真香!Spring 全家桶永远滴神!

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spring 官方批处理框架真香!Spring 全家桶永远滴神!

推荐一个很多小伙伴没注意到的 Spring 官方的批处理框架。

Spring Batch 是一个轻量级但功能又十分全面的批处理框架,主要用于批处理场景比如从数据库、文件或队列中读取大量记录。不过,需要注意的是:Spring Batch 不是调度框架。商业和开源领域都有许多优秀的企业调度框架比如 Quartz、XXL-JOB、Elastic-Job。它旨在与调度程序一起工作,而不是取代调度程序。

目前,Spring Batch 也已经被收录进了开源项目 awesome-java (非常棒的 Java 开源项目集合)。

项目地址:https://github.com/CodingDocs/awesome-java

4316f94d57ca3f325e75af3bd93aed9.png关于 Spring Batch 的详细介绍可以参考 Spring Batch 官方文档[1],入门教程可以参考下面的内容,原文地址:https://mrbird.cc/Spring-Batch 入门.html 。

项目搭建

新建一个 Spring Boot 项目,版本为 2.2.4.RELEASE,artifactId 为 spring-batch-start,项目结构如下图所示:

366fabbd8f25b4f08107d504eb93fd7.png

然后在 pom.xml 中引入 Spring Batch、MySQL 和 JDBC 依赖,引入后 pom.xml 内容如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cc.mrbird</groupId>
    <artifactId>spring-batch-start</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-batch-start</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

在编写代码之前,我们先来简单了解下 Spring Batch 的组成:

ae4bf24fa94892762002612f88203db.png

Spring Batch 里最基本的单元就是任务 Job,一个 Job 由若干个步骤 Step 组成。

任务启动器 Job Launcher 负责运行 Job。

任务存储仓库 Job Repository 存储着 Job 的执行状态,参数和日志等信息。Job 处理任务又可以分为三大类:

数据读取 Item Reader

数据中间处理 Item Processor

数据输出 Item Writer。

任务存储仓库可以是关系型数据库 MySQL,非关系型数据库 MongoDB 或者直接存储在内存中,本篇使用的是 MySQL 作为任务存储仓库。

新建一个名称为 springbatch 的 MySQL 数据库,然后导入 org.springframework.batch.core 目录下的 schema-mysql.sql 文件:

5207b3a1220935cc7ac75d2eb14b7a0.png

04c99acc76d8a88e001eb8f7e9bda67.png

导入后,库表如下图所示:

a5a416672f198865bb6fda54c05cf93.png

然后在项目的配置文件 application.yml 里添加 MySQL 相关配置:

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/springbatch
    username: root
    password: 123456

接着在 Spring Boot 的入口类上添加 @EnableBatchProcessing 注解,表示开启 Spring Batch 批处理功能:

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchStartApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchStartApplication.class, args);
    }
}

至此,基本框架搭建好了,下面开始配置一个简单的任务。

编写第一个任务

cc.mrbird.batch 目录下新建 job 包,然后在该包下新建一个 FirstJobDemo 类,代码如下所示:

@Component
public class FirstJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job firstJob() {
        return jobBuilderFactory.get("firstJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("执行步骤....");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们注入了JobBuilderFactory任务创建工厂和StepBuilderFactory步骤创建工厂,分别用于创建任务 Job 和步骤 Step。JobBuilderFactory的get方法用于创建一个指定名称的任务,start方法指定任务的开始步骤,步骤通过StepBuilderFactory构建。

步骤 Step 由若干个小任务 Tasklet 组成,所以我们通过tasklet方法创建。tasklet方法接收一个Tasklet类型参数,Tasklet是一个函数是接口,源码如下:

public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

所以我们可以使用 lambda 表达式创建一个匿名实现:

(contribution, chunkContext) -> {
    System.out.println("执行步骤....");
    return RepeatStatus.FINISHED;
}

该匿名实现必须返回一个明确的执行状态,这里返回RepeatStatus.FINISHED表示该小任务执行成功,正常结束。

此外,需要注意的是,我们配置的任务 Job 必须注册到 Spring IOC 容器中,并且任务的名称和步骤的名称组成唯一。比如上面的例子,我们的任务名称为 firstJob,步骤的名称为 step,如果存在别的任务和步骤组合也叫这个名称的话,则会执行失败。

启动项目,控制台打印日志如下:

...
2020-03-06 11:01:11.785  INFO 17324 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=firstJob]] launched with the following parameters: [{}]
2020-03-06 11:01:11.846  INFO 17324 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
执行步骤....
2020-03-06 11:01:11.886  INFO 17324 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 40ms
2020-03-06 11:01:11.909  INFO 17324 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=firstJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 101ms

可以看到,任务成功执行了,数据库的库表也将记录相关运行日志。

重新启动项目,控制台并不会再次打印出任务执行日志,因为 Job 名称和 Step 名称组成唯一,执行完的不可重复的任务,不会再次执行。

多步骤任务

一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在 job 包下新建MultiStepJobDemo类:

@Component
public class MultiStepJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job multiStepJob() {
        return jobBuilderFactory.get("multiStepJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }
    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们通过step1()、step2()和step3()三个方法创建了三个步骤。Job 里要使用这些步骤,只需要通过JobBuilderFactory的start方法指定第一个步骤,然后通过next方法不断地指定下一个步骤即可。

启动项目,控制台打印日志如下:

2020-03-06 13:52:52.188  INFO 18472 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multiStepJob]] launched with the following parameters: [{}]
2020-03-06 13:52:52.222  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 13:52:52.251  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 29ms
2020-03-06 13:52:52.292  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 13:52:52.323  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step2] executed in 30ms
2020-03-06 13:52:52.375  INFO 18472 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 13:52:52.405  INFO 18472 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 29ms
2020-03-06 13:52:52.428  INFO 18472 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=multiStepJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 231ms

三个步骤依次执行成功。

多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:

@Component
public class MultiStepJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job multiStepJob() {
        return jobBuilderFactory.get("multiStepJob2")
                .start(step1())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
                .from(step2())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
                .from(step3()).end()
                .build();
    }
    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

multiStepJob()方法的含义是:multiStepJob2 任务先执行 step1,当 step1 状态为完成时,接着执行 step2,当 step2 的状态为完成时,接着执行 step3。ExitStatus.COMPLETED常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:

public class ExitStatus implements Serializable, Comparable<ExitStatus> {
    /**
     * Convenient constant value representing unknown state - assumed not
     * continuable.
     */
    public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
    /**
     * Convenient constant value representing continuable state where processing
     * is still taking place, so no further action is required. Used for
     * asynchronous execution scenarios where the processing is happening in
     * another thread or process and the caller is not required to wait for the
     * result.
     */
    public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
    /**
     * Convenient constant value representing finished processing.
     */
    public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
    /**
     * Convenient constant value representing job that did no processing (e.g.
     * because it was already complete).
     */
    public static final ExitStatus NOOP = new ExitStatus("NOOP");
    /**
     * Convenient constant value representing finished processing with an error.
     */
    public static final ExitStatus FAILED = new ExitStatus("FAILED");
    /**
     * Convenient constant value representing finished processing with
     * interrupted status.
     */
    public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
    ...
}

启动项目,控制台日志打印如下:

2020-03-06 14:21:49.384  INFO 18745 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=multiStepJob2]] launched with the following parameters: [{}]
2020-03-06 14:21:49.427  INFO 18745 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 14:21:49.456  INFO 18745 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 29ms
2020-03-06 14:21:49.501  INFO 18745 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 14:21:49.527  INFO 18745 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step2] executed in 26ms
2020-03-06 14:21:49.576  INFO 18745 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 14:21:49.604  INFO 18745 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 28ms
2020-03-06 14:21:49.629  INFO 18745 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=multiStepJob2]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 238ms

Flow 的用法

Flow 的作用就是可以将多个步骤 Step 组合在一起然后再组装到任务 Job 中。举个 Flow 的例子,在 job 包下新建FlowJobDemo类:

@Component
public class FlowJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .start(flow())
                .next(step3())
                .end()
                .build();
    }
    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    // 创建一个flow对象,包含若干个step
    private Flow flow() {
        return new FlowBuilder<Flow>("flow")
                .start(step1())
                .next(step2())
                .build();
    }
}

上面代码中,我们通过FlowBuilder将 step1 和 step2 组合在一起,创建了一个名为 flow 的 Flow,然后再将其赋给任务 Job。使用 Flow 和 Step 构建 Job 的区别是,Job 流程中包含 Flow 类型的时候需要在build()方法前调用end()方法。

启动程序,控制台日志打印如下:

2020-03-06 14:36:42.621  INFO 18865 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowJob]] launched with the following parameters: [{}]
2020-03-06 14:36:42.667  INFO 18865 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 14:36:42.697  INFO 18865 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 30ms
2020-03-06 14:36:42.744  INFO 18865 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 14:36:42.771  INFO 18865 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step2] executed in 27ms
2020-03-06 14:36:42.824  INFO 18865 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 14:36:42.850  INFO 18865 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 25ms
2020-03-06 14:36:42.874  INFO 18865 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=flowJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 245ms

并行执行

任务中的步骤除了可以串行执行(一个接着一个执行)外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率。

将任务并行化只需两个简单步骤:

  1. 将步骤 Step 转换为 Flow;
  2. 任务 Job 中指定并行 Flow。

举个例子,在 job 包下新建SplitJobDemo类:

@Component
public class SplitJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job splitJob() {
        return jobBuilderFactory.get("splitJob")
                .start(flow1())
                .split(new SimpleAsyncTaskExecutor()).add(flow2())
                .end()
                .build();
    }
    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Flow flow1() {
        return new FlowBuilder<Flow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }
    private Flow flow2() {
        return new FlowBuilder<Flow>("flow2")
                .start(step3())
                .build();
    }
}

上面例子中,我们创建了两个 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通过JobBuilderFactory的split方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)。

启动项目,控制台日志打印如下:

2020-03-06 15:25:43.602  INFO 19449 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=splitJob]] launched with the following parameters: [{}]
2020-03-06 15:25:43.643  INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
2020-03-06 15:25:43.650  INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤三操作。。。
执行步骤一操作。。。
2020-03-06 15:25:43.673  INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 23ms
2020-03-06 15:25:43.674  INFO 19449 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 31ms
2020-03-06 15:25:43.714  INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
执行步骤二操作。。。
2020-03-06 15:25:43.738  INFO 19449 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [step2] executed in 24ms
2020-03-06 15:25:43.758  INFO 19449 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=splitJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 146ms

可以看到 step3 并没有在 step2 后才执行,说明步骤已经是并行化的(开启并行化后,并行的步骤执行顺序并不能 100%确定,因为线程调度具有不确定性)。

任务决策器

决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。

使用决策器前,我们需要自定义一个决策器的实现。在 cc.mrbird.batch 包下新建 decider 包,然后创建MyDecider类,实现JobExecutionDecider接口:

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();
        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            return new FlowExecutionStatus("weekend");
        } else {
            return new FlowExecutionStatus("workingDay");
        }
    }
}

MyDecider实现JobExecutionDecider接口的decide方法,该方法返回FlowExecutionStatus。上面的逻辑是:判断今天是否是周末,如果是,返回FlowExecutionStatus("weekend")状态,否则返回FlowExecutionStatus("workingDay")状态。

下面演示如何在任务 Job 里使用决策器。在 job 包下新建DeciderJobDemo:

@Component
public class DeciderJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private MyDecider myDecider;
    @Bean
    public Job deciderJob() {
        return jobBuilderFactory.get("deciderJob")
                .start(step1())
                .next(myDecider)
                .from(myDecider).on("weekend").to(step2())
                .from(myDecider).on("workingDay").to(step3())
                .from(step3()).on("*").to(step4())
                .end()
                .build();
    }
    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
    private Step step4() {
        return stepBuilderFactory.get("step4")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤四操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

上面代码中,我们注入了自定义决策器MyDecider,然后在jobDecider()方法里使用了该决策器:

@Bean
public Job deciderJob() {
    return jobBuilderFactory.get("deciderJob")
            .start(step1())
            .next(myDecider)
            .from(myDecider).on("weekend").to(step2())
            .from(myDecider).on("workingDay").to(step3())
            .from(step3()).on("*").to(step4())
            .end()
            .build();
}

这段代码的含义是:任务 deciderJob 首先执行 step1,然后指定自定义决策器,如果决策器返回 weekend,那么执行 step2,如果决策器返回 workingDay,那么执行 step3。如果执行了 step3,那么无论 step3 的结果是什么,都将执行 step4。

启动项目,控制台输出如下所示:

2020-03-06 16:09:10.541  INFO 19873 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=deciderJob]] launched with the following parameters: [{}]
2020-03-06 16:09:10.609  INFO 19873 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
执行步骤一操作。。。
2020-03-06 16:09:10.641  INFO 19873 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 32ms
2020-03-06 16:09:10.692  INFO 19873 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step3]
执行步骤三操作。。。
2020-03-06 16:09:10.723  INFO 19873 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step3] executed in 31ms
2020-03-06 16:09:10.769  INFO 19873 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step4]
执行步骤四操作。。。
2020-03-06 16:09:10.797  INFO 19873 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step4] executed in 27ms
2020-03-06 16:09:10.818  INFO 19873 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=deciderJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 256ms

因为今天是 2020 年 03 月 06 日星期五,是工作日,所以任务执行了 step1、step3 和 step4。

任务嵌套

任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。

举个例子,在 job 包下新建NestedJobDemo类:

@Component
public class NestedJobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
    // 子任务二
    private Job childJobTwo() {
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

上面代码中,我们通过childJobOne()和childJobTwo()方法创建了两个任务 Job,这里没什么好说的,前面都介绍过。关键在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我们通过JobStepBuilder构建了一个名称为childJobOneStep的 Step,顾名思义,它是一个任务型 Step 的构造工厂,可以将任务转换为“特殊”的步骤。在构建过程中,我们还需要传入任务执行器 JobLauncher、任务仓库 JobRepository 和事务管理器 PlatformTransactionManager。

将任务转换为特殊的步骤后,将其赋给父任务 parentJob 即可,流程和前面介绍的一致。

配置好后,启动项目,控制台输出如下所示:

2020-03-06 16:58:39.771  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=parentJob]] launched with the following parameters: [{}]
2020-03-06 16:58:39.812  INFO 21588 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [childJobOneStep]
2020-03-06 16:58:39.866  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=childJobOne]] launched with the following parameters: [{}]
2020-03-06 16:58:39.908  INFO 21588 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [childJobOneStep]
子任务一执行步骤。。。
2020-03-06 16:58:39.940  INFO 21588 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [childJobOneStep] executed in 32ms
2020-03-06 16:58:39.960  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=childJobOne]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 86ms
2020-03-06 16:58:39.983  INFO 21588 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [childJobOneStep] executed in 171ms
2020-03-06 16:58:40.019  INFO 21588 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [childJobTwoStep]
2020-03-06 16:58:40.067  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=childJobTwo]] launched with the following parameters: [{}]
2020-03-06 16:58:40.102  INFO 21588 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [childJobTwoStep]
子任务二执行步骤。。。
2020-03-06 16:58:40.130  INFO 21588 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [childJobTwoStep] executed in 28ms
2020-03-06 16:58:40.152  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=childJobTwo]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 75ms
2020-03-06 16:58:40.157  INFO 21588 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [childJobTwoStep] executed in 138ms
2020-03-06 16:58:40.177  INFO 21588 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=parentJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 398ms


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
17天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
26天前
|
Java API 数据库
Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐
本文通过在线图书管理系统案例,详细介绍如何使用Spring Boot构建RESTful API。从项目基础环境搭建、实体类与数据访问层定义,到业务逻辑实现和控制器编写,逐步展示了Spring Boot的简洁配置和强大功能。最后,通过Postman测试API,并介绍了如何添加安全性和异常处理,确保API的稳定性和安全性。
34 0
|
21天前
|
前端开发 Java 数据库连接
Spring 框架:Java 开发者的春天
Spring 框架是一个功能强大的开源框架,主要用于简化 Java 企业级应用的开发,由被称为“Spring 之父”的 Rod Johnson 于 2002 年提出并创立,并由Pivotal团队维护。
41 1
Spring 框架:Java 开发者的春天
|
13天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,帮助开发者提高开发效率和应用的可维护性。
31 2
|
13天前
|
消息中间件 NoSQL Java
springboot整合常用中间件框架案例
该项目是Spring Boot集成整合案例,涵盖多种中间件的使用示例,每个案例项目使用最小依赖,便于直接应用到自己的项目中。包括MyBatis、Redis、MongoDB、MQ、ES等的整合示例。
61 1
|
21天前
|
Java 数据库连接 开发者
Spring 框架:Java 开发者的春天
【10月更文挑战第27天】Spring 框架由 Rod Johnson 在 2002 年创建,旨在解决 Java 企业级开发中的复杂性问题。它通过控制反转(IOC)和面向切面的编程(AOP)等核心机制,提供了轻量级的容器和丰富的功能,支持 Web 开发、数据访问等领域,显著提高了开发效率和应用的可维护性。Spring 拥有强大的社区支持和丰富的生态系统,是 Java 开发不可或缺的工具。
|
27天前
|
人工智能 开发框架 Java
总计 30 万奖金,Spring AI Alibaba 应用框架挑战赛开赛
Spring AI Alibaba 应用框架挑战赛邀请广大开发者参与开源项目的共建,助力项目快速发展,掌握 AI 应用开发模式。大赛分为《支持 Spring AI Alibaba 应用可视化调试与追踪本地工具》和《基于 Flow 的 AI 编排机制设计与实现》两个赛道,总计 30 万奖金。
|
2月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
|
1月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
161 2
|
6天前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
18 2
 SpringBoot入门(7)- 配置热部署devtools工具