Spring Boot中的批处理任务实现

简介: Spring Boot中的批处理任务实现

Spring Boot中的批处理任务实现

今天我们来探讨一下在Spring Boot中如何实现批处理任务。

1. 引言

在日常开发中,我们经常需要处理大批量的数据操作,例如定期导入导出数据、数据清理、统计分析等。这些任务通常需要在后台运行,不会阻塞主应用的正常操作。Spring Batch是一个轻量级的批处理框架,专门用于处理这些批量任务。本文将详细介绍如何在Spring Boot项目中使用Spring Batch实现批处理任务。

2. 引入依赖

首先,我们需要在pom.xml中添加Spring Batch的依赖。

<dependencies>
    <!-- Spring Batch Core -->
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>4.3.4</version>
    </dependency>
    <!-- Spring Batch Infrastructure -->
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-infrastructure</artifactId>
        <version>4.3.4</version>
    </dependency>
    <!-- Spring Boot Starter Batch -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!-- H2 Database for testing -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

3. 配置批处理任务

3.1 创建批处理配置类

在Spring Boot中,我们可以通过配置类来定义批处理任务的步骤和作业。下面是一个简单的批处理配置类示例。

package cn.juwatech.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {
   

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public BatchConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
   
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public Job importUserJob(JobExecutionListener listener) {
   
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
   
        return stepBuilderFactory.get("step1")
                .tasklet(helloWorldTasklet())
                .build();
    }

    @Bean
    public Tasklet helloWorldTasklet() {
   
        return (contribution, chunkContext) -> {
   
            System.out.println("Hello, World!");
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public JobExecutionListener listener() {
   
        return new JobExecutionListenerSupport() {
   
            @Override
            public void afterJob(org.springframework.batch.core.JobExecution jobExecution) {
   
                System.out.println("Job finished!");
            }
        };
    }
}
3.2 配置数据源

为了运行Spring Batch作业,我们需要配置数据源。这里我们使用H2数据库进行示例。

spring:
  datasource:
    url: jdbc:h2:mem:testdb
    driver-class-name: org.h2.Driver
    username: sa
    password: password
  h2:
    console:
      enabled: true

4. 创建批处理任务

在上面的配置类中,我们定义了一个简单的任务步骤step1,该步骤执行一个打印"Hello, World!"的任务。接下来,我们可以创建一个批处理任务的具体实现。

package cn.juwatech.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchJobRunner implements CommandLineRunner {
   

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importUserJob;

    @Override
    public void run(String... args) throws Exception {
   
        jobLauncher.run(importUserJob, new org.springframework.batch.core.JobParameters());
    }
}

5. 测试批处理任务

启动Spring Boot应用程序,查看控制台输出,可以看到"Hello, World!"和"Job finished!"的输出,表示批处理任务已成功执行。

6. 扩展批处理任务

在实际应用中,批处理任务可能更加复杂,例如读取文件、处理数据、写入数据库等。我们可以使用Spring Batch提供的ItemReader、ItemProcessor和ItemWriter接口来实现这些功能。

下面是一个从CSV文件读取数据、处理数据并写入到数据库的示例。

package cn.juwatech.batch;

import cn.juwatech.entity.User;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

@Component
public class CsvBatchConfig {
   

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public ItemReader<User> reader() {
   
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(new ClassPathResource("users.csv"))
                .delimited()
                .names(new String[]{
   "id", "name"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {
   {
   
                    setTargetType(User.class);
                }})
                .build();
    }

    @Bean
    public ItemProcessor<User, User> processor() {
   
        return user -> {
   
            user.setName(user.getName().toUpperCase());
            return user;
        };
    }

    @Bean
    public ItemWriter<User> writer() {
   
        return items -> {
   
            items.forEach(item -> System.out.println("Writing item: " + item));
        };
    }

    @Bean
    public Step csvStep() {
   
        return stepBuilderFactory.get("csvStep")
                .<User, User>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

7. 总结

通过本文的介绍,我们了解了如何在Spring Boot中使用Spring Batch实现批处理任务,并通过具体的代码实例展示了如何配置和运行批处理任务。Spring Batch是一个功能强大且灵活的批处理框架,适用于各种批处理场景。在实际开发中,我们可以根据具体需求灵活配置和扩展批处理任务,提高应用的处理能力和效率。

相关文章
|
5月前
|
druid Java 数据库
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
|
5月前
|
监控 Java 数据处理
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解
消息中间件 缓存 监控
81 0
|
2月前
|
Java 数据安全/隐私保护
SpringBoot 自定义初始化任务 Runner
SpringBoot 自定义初始化任务 Runner
11 0
|
3月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
30 0
|
4月前
|
Java 测试技术
springboot延时任务
springboot延时任务
|
4月前
|
SQL API 调度
Springboot2.4.5集成Quartz实现动态任务数据持久化-不怕重启服务
Springboot2.4.5集成Quartz实现动态任务数据持久化-不怕重启服务
|
4月前
|
Java API 调度
Web后端Javaee企业级开发之定时任务 Springboot整合任务框架Quartz和Task详解
Web后端Javaee企业级开发之定时任务 Springboot整合任务框架Quartz和Task详解
53 0
|
5月前
|
SQL Java 调度
SpringBoot使用@Scheduled定时任务录入将要过期任务数据
SpringBoot使用@Scheduled定时任务录入将要过期任务数据