基于Java多线程处理数据

简介: 【4月更文挑战第9天】 基于Java多线程处理数据

背景

在日常工作中,有一个同步企微客户-学员关系接口的定时任务在执行中随着数据量的不断增长,定时任务的执行结束时间也出现了当天执行不完的情况,影响到了正常业务的运行。基于这种情况,在对该定时任务的业务逻辑代码分析验证后得出是调用企微客户-学员关系接口时耗时引起的,但是查阅企微接口文档,又不支持批量调用,只能逐个调用。那么这种情况下既然批量调用接口不支持,那么可以采用多线程并发调用的方式来降低定时任务整体的执行时间,于是就需要用到线程池来进行多线程操作。

代码实现

在这里我将会使用spring自带的线程池类ThreadPoolTaskExecutor来进行处理,ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理,源代码中可以看到

image.png

而线程池类ThreadPoolExecutor 是JDK的线程池类,继承 AbstractExecutorService ,

public class ThreadPoolExecutor extends AbstractExecutorService {

AbstractExecutorService 实现 ExecutorService,

public abstract class AbstractExecutorService implements ExecutorService {

ExecutorService 继承 Executor

public interface ExecutorService extends Executor {

下面开始初始化线程池类 ThreadPoolTaskExecutor,配置类 ThreadPoolConfig 代码如下

/**
 * 线程池配置
 *
 **/
@Configuration
public class ThreadPoolConfig
{
    // 核心线程池大小
    private int corePoolSize = 50;

    // 最大可创建的线程数
    private int maxPoolSize = 200;

    // 队列最大长度
    private int queueCapacity = 1000;

    // 线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 300;

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

补充同步企微客户-学员关系定时任务 SyncWechatWorkCustomerLinkDetailHandler 代码如下

@Component
@JobHandler("syncWechatWorkCustomerLinkDetailHandler")
public class SyncWechatWorkCustomerLinkDetailHandler extends IJobHandler {

    @Autowired
    private IWechatCustomerLinkDetailService wechatCustomerLinkDetailService;

    @Override
    public ReturnT<String> execute(String params) throws Exception {
        wechatCustomerLinkDetailService.syncWechatWorkCustomerLinkDetail(params);
        return ReturnT.SUCCESS;
    }
}

业务处理实现类 syncWechatWorkCustomerLinkDetail 代码如下

    @Override
    public void syncWechatWorkCustomerLinkDetail(String params) {
        XxlJobLogger.log("补充任务开始执行...[{}]",params);

        //查询条件对象
        WechatCustomerLinkDetail searchparam = new WechatCustomerLinkDetail();
        if (StringUtils.isNotEmpty(params)) {
            Long[] ids = Convert.toLongArray(params);
            //根据ids查询数据
            searchparam.setLinkIds(ids);
        }
        // 分页查询企微获客助手客户链接
        int pageNo = 0;
        int pageSize = 200;
        while(true){
            pageNo++;
            XxlJobLogger.log("第【{}】页数据开始补充...",pageNo);
            PageHelper.startPage(pageNo, pageSize);
            PageHelper.orderBy("id asc");
            List<WechatCustomerLinkDetail> list = wechatCustomerLinkDetailMapper.selectWechatCustomerLinkDetailList(searchparam);
            PageHelper.clearPage();
            if (CollUtil.isEmpty(list) ) {
                break;
            }
            //开始补充数据
            multiThreadProcessData(list);
            XxlJobLogger.log("第【{}】页数据补充完成...",pageNo);
        }
    }

多线程处理列表中的数据类 multiThreadProcessData 代码如下

    /**
     * 使用多线程处理列表中的数据
     * @param list 待处理的微信客户链接详情列表
     */
    public void multiThreadProcessData(List<WechatCustomerLinkDetail> list) {
        // 将大集合分割为多个小集合,以便多线程处理
        List<List<WechatCustomerLinkDetail>> partitionData = partitionData(list, 10);
        // 获取线程池执行器
        ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor");
        // 创建计数器,用于线程同步
        CountDownLatch latch = new CountDownLatch(partitionData.size());

        for (List<WechatCustomerLinkDetail> details : partitionData) {
            // 提交任务给线程池执行,每个任务负责处理一个分割后的列表
            executor.execute(() -> {
                try {
                    for (WechatCustomerLinkDetail detail : details) {
                        //打印线程名称
                        //System.out.println("name========"+Thread.currentThread().getName());
                        // 对每个详情进行处理,填充微信用户名称信息  这里就是业务逻辑处理的地方
                        fillWechatUserNameInfo(detail);
                    }
                } catch (Exception e) {
                    // 捕获异常并打印,避免线程异常中断
                    e.printStackTrace();
                } finally {
                    // 处理完成后,计数器减一,用于线程同步
                    latch.countDown();
                }
            });
        }

        // 等待所有任务完成
        try {
            latch.await();
        } catch (InterruptedException e) {
            // 线程被中断,打印异常信息
            e.printStackTrace();
        }
    }

分割数据列表 partitionData  代码

    /**
     * 分割数据列表成多个小块。
     * @param dataList 待分割的数据列表,包含微信客户链接详情。
     * @param partitionSize 每个分区的大小。
     * @return 分割后的数据列表,每个元素是一个分区,分区内部保持原有顺序。
     */
    private List<List<WechatCustomerLinkDetail>> partitionData(List<WechatCustomerLinkDetail> dataList, int partitionSize) {
        List<List<WechatCustomerLinkDetail>> partitions = new ArrayList<>();

        // 总数据量
        int size = dataList.size();
        // 每个分区的实际大小,整除操作保证每个分区大小尽量均匀
        int batchSize = size / partitionSize;

        // 遍历分区数量次,为每个分区添加数据
        for (int i = 0; i < partitionSize; i++) {
            // 当前分区的起始索引
            int fromIndex = i * batchSize;
            // 当前分区的结束索引,如果是最后一个分区,则包含所有剩余数据
            int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;

            // 将当前分区的数据添加到分区列表中
            partitions.add(dataList.subList(fromIndex, toIndex));
        }
        return partitions;
    }

到这里整个基于多线程处理数据的代码就整理完了,代码结构并不复杂,主要是注意数据查询以及服务器最大线程数相关数据,防止线程不够用的情况。

相关文章
|
1天前
|
Java 调度
Java一分钟之线程池:ExecutorService与Future
【5月更文挑战第12天】Java并发编程中,`ExecutorService`和`Future`是关键组件,简化多线程并提供异步执行能力。`ExecutorService`是线程池接口,用于提交任务到线程池,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。通过`submit()`提交任务并返回`Future`对象,可检查任务状态、获取结果或取消任务。注意处理`ExecutionException`和避免无限等待。实战示例展示了如何异步执行任务并获取结果。理解这些概念对提升并发性能至关重要。
15 5
|
1天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第12天】 在现代软件开发中,多线程编程是提升应用程序性能和响应能力的关键手段之一。特别是在Java语言中,由于其内置的跨平台线程支持,开发者可以轻松地创建和管理线程。然而,随之而来的并发问题也不容小觑。本文将探讨Java并发编程的核心概念,包括线程安全策略、锁机制以及性能优化技巧。通过实例分析与性能比较,我们旨在为读者提供一套既确保线程安全又兼顾性能的编程指导。
|
2天前
|
Java
Java一分钟:线程协作:wait(), notify(), notifyAll()
【5月更文挑战第11天】本文介绍了Java多线程编程中的`wait()`, `notify()`, `notifyAll()`方法,它们用于线程间通信和同步。这些方法在`synchronized`代码块中使用,控制线程执行和资源访问。文章讨论了常见问题,如死锁、未捕获异常、同步使用错误及通知错误,并提供了生产者-消费者模型的示例代码,强调理解并正确使用这些方法对实现线程协作的重要性。
10 3
|
2天前
|
安全 算法 Java
Java一分钟:线程同步:synchronized关键字
【5月更文挑战第11天】Java中的`synchronized`关键字用于线程同步,防止竞态条件,确保数据一致性。本文介绍了其工作原理、常见问题及避免策略。同步方法和同步代码块是两种使用形式,需注意避免死锁、过度使用导致的性能影响以及理解锁的可重入性和升级降级机制。示例展示了同步方法和代码块的运用,以及如何避免死锁。正确使用`synchronized`是编写多线程安全代码的核心。
53 2
|
2天前
|
安全 Java 调度
Java一分钟:多线程编程初步:Thread类与Runnable接口
【5月更文挑战第11天】本文介绍了Java中创建线程的两种方式:继承Thread类和实现Runnable接口,并讨论了多线程编程中的常见问题,如资源浪费、线程安全、死锁和优先级问题,提出了解决策略。示例展示了线程通信的生产者-消费者模型,强调理解和掌握线程操作对编写高效并发程序的重要性。
39 3
|
2天前
|
SQL Java
java处理数据查看范围
java处理数据查看范围
|
2天前
|
安全 Java
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第11天】在Java并发编程中,线程安全和性能优化是两个重要的主题。本文将深入探讨这两个方面,包括线程安全的基本概念,如何实现线程安全,以及如何在保证线程安全的同时进行性能优化。我们将通过实例和代码片段来说明这些概念和技术。
3 0
|
2天前
|
Java 调度
Java并发编程:深入理解线程池
【5月更文挑战第11天】本文将深入探讨Java中的线程池,包括其基本概念、工作原理以及如何使用。我们将通过实例来解释线程池的优点,如提高性能和资源利用率,以及如何避免常见的并发问题。我们还将讨论Java中线程池的实现,包括Executor框架和ThreadPoolExecutor类,并展示如何创建和管理线程池。最后,我们将讨论线程池的一些高级特性,如任务调度、线程优先级和异常处理。
|
3天前
|
安全 Java
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
【JAVA进阶篇教学】第十篇:Java中线程安全、锁讲解
|
3天前
|
安全 Java
【JAVA进阶篇教学】第六篇:Java线程中状态
【JAVA进阶篇教学】第六篇:Java线程中状态