Spring Boot 中使用 Function 和异步线程池处理列表拆分任务并汇总结果

简介: 在Java开发中,处理大规模数据时常常需要将列表拆分为多个子列表进行异步处理并汇总结果。本文介绍如何在Spring Boot中使用Function和异步线程池实现高效且可维护的代码,涵盖结果封装、线程池配置、列表拆分处理及结果汇总等关键步骤。

在 Java 开发中,处理大规模数据时,常需要将列表拆分为多个子列表并异步处理,最后汇总结果。本文将介绍如何在 Spring Boot 中利用 Function<List<T>, ProcessResult> 和异步线程池实现这一需求,确保代码的高效性和可维护性。

一、实现思路

  1. 结果封装:定义 ProcessResult 类,封装每个子列表处理后的 int sumStringBuilder msg
  2. 线程池配置:创建独立的线程池,避免与其他异步任务干扰。
  3. 列表处理类:将列表按指定大小拆分,每个子列表通过 Function 处理并提交到线程池异步执行。
  4. 结果汇总:假设主线程需要获取到处理结果的成功记录数以及异常信息的描述,收集所有子任务的结果,计算总和并拼接消息,根据实际需求可以改成其他数据的收集。

二、代码实现

1. 结果封装类 ProcessResult

java

体验AI代码助手

代码解读

复制代码

public class ProcessResult {
    private int sum;
    private StringBuilder msg;

    public ProcessResult(int sum, StringBuilder msg) {
        this.sum = sum;
        this.msg = msg;
    }

    public int getSum() {
        return sum;
    }

    public StringBuilder getMsg() {
        return msg;
    }
}

作用:封装每个子列表处理的结果,便于后续汇总。

2. 线程池配置类 ThreadPoolConfig

java

体验AI代码助手

代码解读

复制代码

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
public class ThreadPoolConfig {
    @Bean(name ="customThreadPool")
    public Executor customThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("custom-thread-");
        executor.initialize();
        return executor;
    }
}

作用:配置独立的线程池 customThreadPool,可根据需求调整线程池参数(如核心线程数、最大线程数等)。建议:通过配置类和配置文件维护线程池参数,这里写成固定值

3. 列表处理类 ListProcessor

java

体验AI代码助手

代码解读

复制代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

@Component
public class ListProcessor<T> {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private ThreadPoolTaskExecutor customThreadPool;

    /**  
     * 异步处理主方法,负责拆分list以及异步调用处理方法,处理方法会返回数据
     */
    public List<ProcessResult> processList(List<T> list, int chunkSize, Function<List<T>, ProcessResult> function)
            throws InterruptedException, ExecutionException {
        List<List<T>> chunks = splitList(list, chunkSize);
        List<CompletableFuture<ProcessResult>> futures = new ArrayList<>();
        ListProcessor<T> self = applicationContext.getBean(this.getClass());
        for (List<T> chunk : chunks) {
            futures.add(self.processChunkAsync(chunk, function));
        }
        List<ProcessResult> results = new ArrayList<>();
        for (CompletableFuture<ProcessResult> future : futures) {
            results.add(future.get());
        }
        return results;
    } 
        
    /**  
     * 异步处理执行方法,通过Async注解实现异步效果
     */  
    @Async("customThreadPool")
    public CompletableFuture<ProcessResult> processChunkAsync(List<T> chunk, Function<List<T>, ProcessResult> function) {
        return CompletableFuture.completedFuture(function.apply(chunk));
    }
    
    /**  
     * 异步处理主方法2,负责拆分list以及异步调用处理方法,使用CountDownLatch保持主线程等待,无需处理方法返回数据时使用该方法,
     * 使用这种方法,仍需要主线程获取处理结果的,可以在主线程中创建一个线程安全的对象,
     * 在子线程的处理方法consumer中去操作该对象,例如使用Collections.synchronizedList创建的集合
     */  
    public <T> void processList2(List<T> list, int chunkSize, Consumer<List<T>> consumer) throws InterruptedException {  
        //list大小小于子集合大小,拆分只会拆出一个子集合,异步没有意义,直接同步执行  
        if (list.size() <= chunkSize) {  
            consumer.accept(list);  
            return;  
        }    //按chunkSize拆分list为多个子集合  
        List<List<T>> chunks = splitList(list, chunkSize);  
        //创建一个CountDownLatch,方便主进程等待  
        CountDownLatch latch = new CountDownLatch(chunks.size());  
        for (List<T> chunk : chunks) {  
            //遍历子集合的集合,依次异步调用处理方法,通过bean调用实现异步  
            getSelf().processChunkAsync(chunk, consumer, latch);  
        }    //阻塞主线程直到异步完成  
        latch.await();  
    }
  
  
    /**  
     * 无需返回值的Consumer调用时的异步处理执行方法,通过Async注解实现异步效果
     */  
    @Async("customThreadPool")  
    public <T> void processChunkAsync(List<T> chunk, Consumer<List<T>> consumer, CountDownLatch latch) {  
        try {  
            consumer.accept(chunk);  
        } finally {  
            latch.countDown();  
        }
    }

    /**  
     * 拆分list为多个子集合,针对最后一个子集合可能存在下标越界的情况,需取集合剩余记录数和子集合大小的较小值
     */  
    private List<List<T>> splitList(List<T> list, int chunkSize) {
        List<List<T>> chunks = new ArrayList<>();
        for (int i = 0; i < list.size(); i += chunkSize) {
            chunks.add(list.subList(i, Math.min(i + chunkSize, list.size())));
        }
        return chunks;
    }
}

关键逻辑

  • processList 方法:拆分列表并提交异步任务,通过 ApplicationContext 获取当前类的 Bean 实例,确保异步注解生效。
  • splitList 方法:按指定大小将列表拆分为子列表。
  • processChunkAsync 方法:使用 @Async 标记异步执行,调用传入的 Function 处理子列表并返回结果。

4. 主应用类 MainApplication

java

体验AI代码助手

代码解读

复制代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.function.Function;

@SpringBootApplication
@EnableAsync
public class MainApplication implements CommandLineRunner {
    @Autowired
    private ListProcessor<Integer> listProcessor;

    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        List<Integer> list = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            list.add(i);
        }
        int chunkSize = 3;
        // 定义子列表的处理逻辑
        Function<List<Integer>, ProcessResult> function = chunk -> {
            int sum = 0;
            StringBuilder msg = new StringBuilder();
            for (int num : chunk) {
                sum += num;
                msg.append(num).append(" ");
            }
            return new ProcessResult(sum, msg);
        };
        // 执行列表处理并获取结果
        List<ProcessResult> results = listProcessor.processList(list, chunkSize, function);
        // 汇总结果
        int totalSum = 0;
        StringJoiner joiner = new StringJoiner(", ");
        for (ProcessResult result : results) {
            totalSum += result.getSum();
            joiner.add(result.getMsg().toString());
        }
        System.out.println("sum= " + totalSum);
        System.out.println("msg= " + joiner.toString());
    }
}

核心流程

  1. 初始化列表并定义拆分大小。
  2. 使用 Function 定义每个子列表的处理逻辑,计算 sum 并拼接 msg
  3. 调用 ListProcessorprocessList 方法异步处理所有子列表。
  4. 汇总所有子任务的结果,输出最终的总和与拼接后的消息。

三、代码解释

  • ProcessResult:封装每个子任务的处理结果,确保数据传递的清晰性。
  • 线程池配置:通过独立线程池避免资源竞争,提高异步处理的效率。
  • ListProcessor:负责列表拆分、异步任务提交和结果收集,利用 Spring 的 AOP 代理机制确保异步方法生效。
  • 主类逻辑:通过 Function 灵活定义业务逻辑,解耦数据处理与异步执行,使代码更具扩展性。

四、总结

本文介绍的方案通过 Function 和异步线程池实现了列表的高效拆分与处理,具有以下优点:

  1. 线程安全:使用独立线程池和 CompletableFuture 确保异步任务的安全执行。
  2. 代码解耦:将列表拆分、异步执行和业务逻辑分离,提高代码的可维护性。
  3. 结果清晰:通过 ProcessResult 封装结果,便于后续汇总和处理。

此方案适用于需要处理大规模数据并需要异步执行的场景,可根据实际需求调整线程池参数和业务逻辑,具有良好的灵活性和扩展性。


转载来源:https://juejin.cn/post/7482765568123977764

相关文章
|
druid Java 数据库
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
|
18天前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
11月前
|
自然语言处理 JavaScript Java
Spring 实现 3 种异步流式接口,干掉接口超时烦恼
本文介绍了处理耗时接口的几种异步流式技术,包括 `ResponseBodyEmitter`、`SseEmitter` 和 `StreamingResponseBody`。这些工具可在执行耗时操作时不断向客户端响应处理结果,提升用户体验和系统性能。`ResponseBodyEmitter` 适用于动态生成内容场景,如文件上传进度;`SseEmitter` 用于实时消息推送,如状态更新;`StreamingResponseBody` 则适合大数据量传输,避免内存溢出。文中提供了具体示例和 GitHub 地址,帮助读者更好地理解和应用这些技术。
2042 121
|
Java Spring 容器
Spring使用异步注解@Async正确姿势
Spring使用异步注解@Async正确姿势,异步任务,spring boot
170 3
|
Java Spring 容器
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
887 3
|
存储 缓存 安全
Spring初始化加速的思路和方案问题之手动指定要异步初始化的bean中的问题如何解决
Spring初始化加速的思路和方案问题之手动指定要异步初始化的bean中的问题如何解决
114 2
|
前端开发 Java API
异步编程 - 11 Spring WebFlux的异步非阻塞处理2
异步编程 - 11 Spring WebFlux的异步非阻塞处理2
302 0
|
监控 Java API
Spring Boot中的异步革命:构建高性能的现代Web应用
【8月更文挑战第29天】Spring Boot 是一个简化 Spring 应用开发与部署的框架。异步任务处理通过后台线程执行耗时操作,提升用户体验和系统并发能力。要在 Spring Boot 中启用异步任务,需在配置类上添加 `@EnableAsync` 注解,并定义一个自定义的 `ThreadPoolTaskExecutor` 或使用默认线程池。通过 `@Async` 注解的方法将在异步线程中执行。异步任务适用于发送电子邮件、数据处理、外部 API 调用和定时任务等场景。最佳实践中应注意正确配置线程池、处理返回值和异常、以及监控任务状态,确保系统的稳定性和健壮性。
166 0
|
Java 开发者 Spring
Spring Boot大法好:解耦、隔离、异步,让代码‘活’起来,性能飙升的秘密武器!
【8月更文挑战第29天】解耦、隔离与异步是Spring Boot中的关键设计原则,能大幅提升软件的可维护性、扩展性和性能。本文通过示例代码详细探讨了这些原则的应用:依赖注入和面向接口编程实现解耦;模块化设计与配置文件实现隔离;`@Async`注解和`CompletableFuture`实现异步处理。综合运用这些原则,可以显著提升软件质量和性能,使系统更加健壮、灵活和高效。
198 0
|
安全 Java 数据库连接
Spring Boot 优雅关机时异步线程安全优化
Spring Boot 优雅关机时异步线程安全优化
373 1