Spring Boot 中使用 Function 和异步线程池处理列表拆分任务并汇总结果

简介: 在Java开发中,处理大规模数据时常常需要将列表拆分为多个子列表进行异步处理并汇总结果。本文介绍如何在Spring Boot中使用Function和异步线程池实现高效且可维护的代码,涵盖结果封装、线程池配置、列表拆分处理及结果汇总等关键步骤。

在 Java 开发中,处理大规模数据时,常需要将列表拆分为多个子列表并异步处理,最后汇总结果。本文将介绍如何在 Spring Boot 中利用 Function<List<T>, ProcessResult> 和异步线程池实现这一需求,确保代码的高效性和可维护性。

一、实现思路

  1. 结果封装:定义 ProcessResult 类,封装每个子列表处理后的 int sumStringBuilder msg
  2. 线程池配置:创建独立的线程池,避免与其他异步任务干扰。
  3. 列表处理类:将列表按指定大小拆分,每个子列表通过 Function 处理并提交到线程池异步执行。
  4. 结果汇总:假设主线程需要获取到处理结果的成功记录数以及异常信息的描述,收集所有子任务的结果,计算总和并拼接消息,根据实际需求可以改成其他数据的收集。

二、代码实现

1. 结果封装类 ProcessResult

java

体验AI代码助手

代码解读

复制代码

public class ProcessResult {
    private int sum;
    private StringBuilder msg;

    public ProcessResult(int sum, StringBuilder msg) {
        this.sum = sum;
        this.msg = msg;
    }

    public int getSum() {
        return sum;
    }

    public StringBuilder getMsg() {
        return msg;
    }
}

作用:封装每个子列表处理的结果,便于后续汇总。

2. 线程池配置类 ThreadPoolConfig

java

体验AI代码助手

代码解读

复制代码

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {
    @Bean(name ="customThreadPool")
    public Executor customThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("custom-thread-");
        executor.initialize();
        return executor;
    }
}

作用:配置独立的线程池 customThreadPool,可根据需求调整线程池参数(如核心线程数、最大线程数等)。建议:通过配置类和配置文件维护线程池参数,这里写成固定值

3. 列表处理类 ListProcessor

java

体验AI代码助手

代码解读

复制代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

@Component
public class ListProcessor<T> {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private ThreadPoolTaskExecutor customThreadPool;

    /**  
     * 异步处理主方法,负责拆分list以及异步调用处理方法,处理方法会返回数据
     */
    public List<ProcessResult> processList(List<T> list, int chunkSize, Function<List<T>, ProcessResult> function)
            throws InterruptedException, ExecutionException {
        List<List<T>> chunks = splitList(list, chunkSize);
        List<CompletableFuture<ProcessResult>> futures = new ArrayList<>();
        ListProcessor<T> self = applicationContext.getBean(this.getClass());
        for (List<T> chunk : chunks) {
            futures.add(self.processChunkAsync(chunk, function));
        }
        List<ProcessResult> results = new ArrayList<>();
        for (CompletableFuture<ProcessResult> future : futures) {
            results.add(future.get());
        }
        return results;
    } 
        
    /**  
     * 异步处理执行方法,通过Async注解实现异步效果
     */  
    @Async("customThreadPool")
    public CompletableFuture<ProcessResult> processChunkAsync(List<T> chunk, Function<List<T>, ProcessResult> function) {
        return CompletableFuture.completedFuture(function.apply(chunk));
    }
    
    /**  
     * 异步处理主方法2,负责拆分list以及异步调用处理方法,使用CountDownLatch保持主线程等待,无需处理方法返回数据时使用该方法,
     * 使用这种方法,仍需要主线程获取处理结果的,可以在主线程中创建一个线程安全的对象,
     * 在子线程的处理方法consumer中去操作该对象,例如使用Collections.synchronizedList创建的集合
     */  
    public <T> void processList2(List<T> list, int chunkSize, Consumer<List<T>> consumer) throws InterruptedException {  
        //list大小小于子集合大小,拆分只会拆出一个子集合,异步没有意义,直接同步执行  
        if (list.size() <= chunkSize) {  
            consumer.accept(list);  
            return;  
        }    //按chunkSize拆分list为多个子集合  
        List<List<T>> chunks = splitList(list, chunkSize);  
        //创建一个CountDownLatch,方便主进程等待  
        CountDownLatch latch = new CountDownLatch(chunks.size());  
        for (List<T> chunk : chunks) {  
            //遍历子集合的集合,依次异步调用处理方法,通过bean调用实现异步  
            getSelf().processChunkAsync(chunk, consumer, latch);  
        }    //阻塞主线程直到异步完成  
        latch.await();  
    }
  
  
    /**  
     * 无需返回值的Consumer调用时的异步处理执行方法,通过Async注解实现异步效果
     */  
    @Async("customThreadPool")  
    public <T> void processChunkAsync(List<T> chunk, Consumer<List<T>> consumer, CountDownLatch latch) {  
        try {  
            consumer.accept(chunk);  
        } finally {  
            latch.countDown();  
        }
    }

    /**  
     * 拆分list为多个子集合,针对最后一个子集合可能存在下标越界的情况,需取集合剩余记录数和子集合大小的较小值
     */  
    private List<List<T>> splitList(List<T> list, int chunkSize) {
        List<List<T>> chunks = new ArrayList<>();
        for (int i = 0; i < list.size(); i += chunkSize) {
            chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
        }
        return chunks;
    }
}

关键逻辑

  • processList 方法:拆分列表并提交异步任务,通过 ApplicationContext 获取当前类的 Bean 实例,确保异步注解生效。
  • splitList 方法:按指定大小将列表拆分为子列表。
  • processChunkAsync 方法:使用 @Async 标记异步执行,调用传入的 Function 处理子列表并返回结果。

4. 主应用类 MainApplication

java

体验AI代码助手

代码解读

复制代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Function;

@SpringBootApplication
@EnableAsync
public class MainApplication implements CommandLineRunner {
    @Autowired
    private ListProcessor<Integer> listProcessor;

    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        List<Integer> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }
        int chunkSize = 3;
        // 定义子列表的处理逻辑
        Function<List<Integer>, ProcessResult> function = chunk -> {
            int sum = 0;
            StringBuilder msg = new StringBuilder();
            for (int num : chunk) {
                sum += num;
                msg.append(num).append(" ");
            }
            return new ProcessResult(sum, msg);
        };
        // 执行列表处理并获取结果
        List<ProcessResult> results = listProcessor.processList(list, chunkSize, function);
        // 汇总结果
        int totalSum = 0;
        StringJoiner joiner = new StringJoiner(", ");
        for (ProcessResult result : results) {
            totalSum += result.getSum();
            joiner.add(result.getMsg().toString());
        }
        System.out.println("sum= " + totalSum);
        System.out.println("msg= " + joiner.toString());
    }
}

核心流程

  1. 初始化列表并定义拆分大小。
  2. 使用 Function 定义每个子列表的处理逻辑,计算 sum 并拼接 msg
  3. 调用 ListProcessorprocessList 方法异步处理所有子列表。
  4. 汇总所有子任务的结果,输出最终的总和与拼接后的消息。

三、代码解释

  • ProcessResult:封装每个子任务的处理结果,确保数据传递的清晰性。
  • 线程池配置:通过独立线程池避免资源竞争,提高异步处理的效率。
  • ListProcessor:负责列表拆分、异步任务提交和结果收集,利用 Spring 的 AOP 代理机制确保异步方法生效。
  • 主类逻辑:通过 Function 灵活定义业务逻辑,解耦数据处理与异步执行,使代码更具扩展性。

四、总结

本文介绍的方案通过 Function 和异步线程池实现了列表的高效拆分与处理,具有以下优点:

  1. 线程安全:使用独立线程池和 CompletableFuture 确保异步任务的安全执行。
  2. 代码解耦:将列表拆分、异步执行和业务逻辑分离,提高代码的可维护性。
  3. 结果清晰:通过 ProcessResult 封装结果,便于后续汇总和处理。

此方案适用于需要处理大规模数据并需要异步执行的场景,可根据实际需求调整线程池参数和业务逻辑,具有良好的灵活性和扩展性。


转载来源:https://juejin.cn/post/7482765568123977764

相关文章
|
2月前
|
人工智能 缓存 负载均衡
spring boot-MultipartFile 机制
本文详解了 Spring Boot 中 MultipartFile 的工作机制及大文件上传的解决方案。内容涵盖 MultipartFile 的解析流程、上传配置、Feign 上传大文件的内存问题及基于 RestTemplate 的流式上传实现。同时介绍了服务器端如何直接处理 application/octet-stream 类型的文件流,避免内存溢出问题。适合需要优化文件上传性能的开发者参考。
238 0
|
Java
Java 清空 List 的多种方法?
Java 清空 List 的多种方法?
2686 0
|
网络协议 应用服务中间件 nginx
使用Dockerfile编写源码安装Nginx镜像
使用Dockerfile编写源码安装Nginx镜像
367 0
|
XML Java 数据格式
Spring系列(三)之Bean的生命周期以及Bean的单例与多例模式
Spring系列(三)之Bean的生命周期以及Bean的单例与多例模式
|
移动开发 JavaScript 前端开发
游戏框架 - 描述Phaser、Three.js等JavaScript游戏框架的核心功能和使用场景。
Phaser是开源2D游戏引擎,适合HTML5游戏,内置物理引擎和强大的图形渲染功能,适用于2D游戏,如消消乐。Three.js是基于WebGL的3D库,用于创建和显示3D图形,支持交互和多种3D效果,广泛应用在游戏、可视化等多个领域。两者各有侧重,选择取决于项目需求和图形交互要求。
472 3
|
2月前
|
SQL 关系型数据库 MySQL
MySQL表设计经验
本文介绍了数据库表设计的15个实用技巧,涵盖命名规范、字段类型选择、主键设计、索引优化等方面,帮助后端程序员提升数据库设计能力,避免常见错误,提高系统性能与可维护性。
|
2月前
|
JavaScript 前端开发
es6新增特性
ECMAScript 6(ES6)是JavaScript的重要升级版本,引入了如`let`和`const`声明变量、箭头函数、块级作用域、类、模板字符串、解构赋值等新特性,提升了代码的简洁性与可维护性。
86 0
|
2月前
|
存储 Java
Java对象的内存布局
在HotSpot虚拟机中,Java对象的内存布局分为三部分:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。对象头包含Mark Word、Class对象指针及数组长度;实例数据存储对象的实际字段内容;对齐填充用于确保对象大小为8字节的整数倍。
|
3月前
|
前端开发 Java Spring
SpringBoot之异步调用@Ansyc
本文介绍了在Spring Boot中实现异步任务的方法,通过在启动类或线程池配置类上添加`@EnableAsync`注解开启异步功能。详细说明了线程池属性类的定义,包括核心线程数、最大线程数、队列容量等参数配置。同时,文章指出需要在目标方法上使用`@Async`注解以实现异步执行,并列举了`@Async`注解失效的多种情况,如方法被`static`修饰、类未被Spring扫描、方法调用者与被调用方法在同一类中等。此外,还探讨了解决事务与异步之间矛盾的方案,强调了正确使用`@Transactional`注解的重要性。
267 8

热门文章

最新文章