基于Java多线程处理数据

简介: 【4月更文挑战第9天】 基于Java多线程处理数据

背景

在日常工作中,有一个同步企微客户-学员关系接口的定时任务在执行中随着数据量的不断增长,定时任务的执行结束时间也出现了当天执行不完的情况,影响到了正常业务的运行。基于这种情况,在对该定时任务的业务逻辑代码分析验证后得出是调用企微客户-学员关系接口时耗时引起的,但是查阅企微接口文档,又不支持批量调用,只能逐个调用。那么这种情况下既然批量调用接口不支持,那么可以采用多线程并发调用的方式来降低定时任务整体的执行时间,于是就需要用到线程池来进行多线程操作。

代码实现

在这里我将会使用spring自带的线程池类ThreadPoolTaskExecutor来进行处理,ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理,源代码中可以看到

image.png

而线程池类ThreadPoolExecutor 是JDK的线程池类,继承 AbstractExecutorService ,

public class ThreadPoolExecutor extends AbstractExecutorService {

AbstractExecutorService 实现 ExecutorService,

public abstract class AbstractExecutorService implements ExecutorService {

ExecutorService 继承 Executor

public interface ExecutorService extends Executor {

下面开始初始化线程池类 ThreadPoolTaskExecutor,配置类 ThreadPoolConfig 代码如下

/**
 * 线程池配置
 *
 **/
@Configuration
public class ThreadPoolConfig
{
    // 核心线程池大小
    private int corePoolSize = 50;

    // 最大可创建的线程数
    private int maxPoolSize = 200;

    // 队列最大长度
    private int queueCapacity = 1000;

    // 线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 300;

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

补充同步企微客户-学员关系定时任务 SyncWechatWorkCustomerLinkDetailHandler 代码如下

@Component
@JobHandler("syncWechatWorkCustomerLinkDetailHandler")
public class SyncWechatWorkCustomerLinkDetailHandler extends IJobHandler {

    @Autowired
    private IWechatCustomerLinkDetailService wechatCustomerLinkDetailService;

    @Override
    public ReturnT<String> execute(String params) throws Exception {
        wechatCustomerLinkDetailService.syncWechatWorkCustomerLinkDetail(params);
        return ReturnT.SUCCESS;
    }
}

业务处理实现类 syncWechatWorkCustomerLinkDetail 代码如下

    @Override
    public void syncWechatWorkCustomerLinkDetail(String params) {
        XxlJobLogger.log("补充任务开始执行...[{}]",params);

        //查询条件对象
        WechatCustomerLinkDetail searchparam = new WechatCustomerLinkDetail();
        if (StringUtils.isNotEmpty(params)) {
            Long[] ids = Convert.toLongArray(params);
            //根据ids查询数据
            searchparam.setLinkIds(ids);
        }
        // 分页查询企微获客助手客户链接
        int pageNo = 0;
        int pageSize = 200;
        while(true){
            pageNo++;
            XxlJobLogger.log("第【{}】页数据开始补充...",pageNo);
            PageHelper.startPage(pageNo, pageSize);
            PageHelper.orderBy("id asc");
            List<WechatCustomerLinkDetail> list = wechatCustomerLinkDetailMapper.selectWechatCustomerLinkDetailList(searchparam);
            PageHelper.clearPage();
            if (CollUtil.isEmpty(list) ) {
                break;
            }
            //开始补充数据
            multiThreadProcessData(list);
            XxlJobLogger.log("第【{}】页数据补充完成...",pageNo);
        }
    }

多线程处理列表中的数据类 multiThreadProcessData 代码如下

    /**
     * 使用多线程处理列表中的数据
     * @param list 待处理的微信客户链接详情列表
     */
    public void multiThreadProcessData(List<WechatCustomerLinkDetail> list) {
        // 将大集合分割为多个小集合,以便多线程处理
        List<List<WechatCustomerLinkDetail>> partitionData = partitionData(list, 10);
        // 获取线程池执行器
        ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
        // 创建计数器,用于线程同步
        CountDownLatch latch = new CountDownLatch(partitionData.size());

        for (List<WechatCustomerLinkDetail> details : partitionData) {
            // 提交任务给线程池执行,每个任务负责处理一个分割后的列表
            executor.execute(() -> {
                try {
                    for (WechatCustomerLinkDetail detail : details) {
                        //打印线程名称
                        //System.out.println("name========"+Thread.currentThread().getName());
                        // 对每个详情进行处理,填充微信用户名称信息  这里就是业务逻辑处理的地方
                        fillWechatUserNameInfo(detail);
                    }
                } catch (Exception e) {
                    // 捕获异常并打印,避免线程异常中断
                    e.printStackTrace();
                } finally {
                    // 处理完成后,计数器减一,用于线程同步
                    latch.countDown();
                }
            });
        }

        // 等待所有任务完成
        try {
            latch.await();
        } catch (InterruptedException e) {
            // 线程被中断,打印异常信息
            e.printStackTrace();
        }
    }

分割数据列表 partitionData  代码

    /**
     * 分割数据列表成多个小块。
     * @param dataList 待分割的数据列表,包含微信客户链接详情。
     * @param partitionSize 每个分区的大小。
     * @return 分割后的数据列表,每个元素是一个分区,分区内部保持原有顺序。
     */
    private List<List<WechatCustomerLinkDetail>> partitionData(List<WechatCustomerLinkDetail> dataList, int partitionSize) {
        List<List<WechatCustomerLinkDetail>> partitions = new ArrayList<>();

        // 总数据量
        int size = dataList.size();
        // 每个分区的实际大小,整除操作保证每个分区大小尽量均匀
        int batchSize = size / partitionSize;

        // 遍历分区数量次,为每个分区添加数据
        for (int i = 0; i < partitionSize; i++) {
            // 当前分区的起始索引
            int fromIndex = i * batchSize;
            // 当前分区的结束索引,如果是最后一个分区,则包含所有剩余数据
            int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;

            // 将当前分区的数据添加到分区列表中
            partitions.add(dataList.subList(fromIndex, toIndex));
        }
        return partitions;
    }

到这里整个基于多线程处理数据的代码就整理完了,代码结构并不复杂,主要是注意数据查询以及服务器最大线程数相关数据,防止线程不够用的情况。

相关文章
|
21天前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
59 0
|
27天前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
133 3
|
1月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
90 16
|
2月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。
|
2月前
|
数据采集 JSON Java
Java爬虫获取1688店铺所有商品接口数据实战指南
本文介绍如何使用Java爬虫技术高效获取1688店铺商品信息,涵盖环境搭建、API调用、签名生成及数据抓取全流程,并附完整代码示例,助力市场分析与选品决策。
|
2月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
3月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
3月前
|
Java 索引
多线程向设备发送数据
多线程向设备发送数据
60 0
|
3月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
309 83