【已解决】利用 Java 多线程并发编程提高数据处理效率

简介: 【已解决】利用 Java 多线程并发编程提高数据处理效率

🎉工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息。需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询、更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作。

💡因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率。


📍这里使用可重用固定线程数的线程池 FixedThreadPool,并使用 CountDownLatch 并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:


首先,通过 Runtime.getRuntime().availableProcessors() 方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。

其次,判断任务的特性,如果为计算密集型任务则设置线程数为 CPU 线程数+1,如果为 IO 密集型任务则设置线程数为 2 * CPU 线程数,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。

之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象 CountDownLatch,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。

然后,调用 executorService.execute() 方法,重写 run 方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。

最后,当所有子线程执行完成后,关闭线程池。

✨在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:

public ResponseData updateHostDept() {
    // ...
    List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;  
        // the number of each group data 
        int eachGroupNum = hostMapList.size() / threadNum; 
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executorService.execute(()->{
                try {
                    for (Map map : group) {
                      // update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                  // let counter minus one 
                    countDownLatch.countDown();  
                }
            });
        }
        try {
          // main thread donnot execute until all child threads finish
            countDownLatch.await();  
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executorService.shutdown();  
        return ResponseData.success();
}

🎉那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min 下降到了 8-10 min,大大提高了执行效率。


💡需要注意的是,这里使用的 newFixedThreadPool 创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个无界队列,默认值为 Integer.MAX_VALUE 极有可能会造成 OOM 问题。因此,一般可以使用 ThreadPoolExecutor 来创建线程池,自己可以指定等待队列中的线程个数,避免产生 OOM 问题。

public ResponseData updateHostDept() {
    // ...
    List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
        // split the hostMapList for the following multi-threads task
        // return the number of logical CPUs
        int processorsNum = Runtime.getRuntime().availableProcessors();
        // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
        // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
        int threadNum = processorsNum * 2;  
        // the number of each group data 
        int eachGroupNum = hostMapList.size() / threadNum; 
        List<List<Map>> groupList = new ArrayList<>();
        for (int i = 0; i < threadNum; i++) {
            int start = i * eachGroupNum;
            if (i == threadNum - 1) {
                int end = mapList.size();
                groupList.add(hostMapList.subList(start, end));
            } else {
                int end = (i+1) * eachGroupNum;
                groupList.add(hostMapList.subList(start, end));
            }
        }
        // update data by using multi-threads asynchronously
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(100));
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (List<Map> group : groupList) {
            executor.execute(()->{
                try {
                    for (Map map : group) {
                      // update the data in mongodb
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                  // let counter minus one 
                    countDownLatch.countDown();  
                }
            });
        }
        try {
          // main thread donnot execute until all child threads finish
            countDownLatch.await();  
        } catch (Exception e) {
            e.printStackTrace();
        }
        // remember to shutdown the threadPool
        executor.shutdown();  
        return ResponseData.success();
}

在上述的代码中,核心线程数和最大线程数分别为 5 和 8,并没有设置的很大的值,因为如果如果设置的很大,线程间频繁的上下文切换也会增加时间消耗,反而不能最大程度上发挥多线程的优势。至于如何选择合适的参数,需要根据机器的参数以及任务的类型综合考虑决定。


🎉最后补充一点,如果想要通过非编码的方式获取机器的 CPU 线程个数也很简单,windows 系统通过任务管理器,选择 “性能”,便可以查看 CPU 线程个数的情况,如下图所示:2e718f6de6d94987ae2f2182c528d9c8.png

🎉从上图可以看到,我的机器中内核是八个 CPU,但是通过超线程技术一个物理的 CPU 核心可以模拟成两个逻辑 CPU 线程,因此我的机器是支持8核16线程的。

相关文章
|
2天前
|
缓存 负载均衡 安全
|
1天前
|
设计模式 算法 安全
Java编程中的设计模式:提升代码的可维护性和扩展性
【8月更文挑战第19天】在软件开发的世界里,设计模式是解决常见问题的一种优雅方式。本文将深入探讨Java编程语言中常用的几种设计模式,并解释如何通过这些模式来提高代码的可维护性和扩展性。文章不涉及具体的代码实现,而是侧重于理论和实践相结合的方式,为读者提供一种思考和改善现有项目的新视角。
|
1天前
|
安全 Java 测试技术
深入探讨Java安全编程的最佳实践,帮助开发者保障应用的安全性
在网络安全日益重要的今天,确保Java应用的安全性成为了开发者必须面对的课题。本文介绍Java安全编程的最佳实践,包括利用FindBugs等工具进行代码审查、严格验证用户输入以防攻击、运用输出编码避免XSS等漏洞、实施访问控制确保授权访问、采用加密技术保护敏感数据等。此外,还强调了使用最新Java版本、遵循最小权限原则及定期安全测试的重要性。通过这些实践,开发者能有效提升Java应用的安全防护水平。
5 1
|
2天前
|
Java 开发者
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案。本文通过技术综述及示例代码,剖析两者在性能上的差异。if-else具有短路特性,但条件增多时JVM会优化提升性能;switch则利用跳转表机制,在处理大量固定选项时表现出色。通过实验对比可见,switch在重复case值处理上通常更快。尽管如此,选择时还需兼顾代码的可读性和维护性。理解这些细节有助于开发者编写出既高效又优雅的Java代码。
7 2
|
1天前
|
Java
java开启线程的四种方法
这篇文章介绍了Java中开启线程的四种方法,包括继承Thread类、实现Runnable接口、实现Callable接口和创建线程池,每种方法都提供了代码实现和测试结果。
java开启线程的四种方法
|
2天前
|
Java 开发者
在Java编程的广阔天地中,if-else与switch语句犹如两位老练的舵手,引领着代码的流向,决定着程序的走向。
在Java编程中,if-else与switch语句是条件判断的两大利器。本文通过丰富的示例,深入浅出地解析两者的特点与应用场景。if-else适用于逻辑复杂的判断,而switch则在处理固定选项或多分支选择时更为高效。从逻辑复杂度、可读性到性能考量,我们将帮助你掌握何时选用哪种语句,让你在编程时更加得心应手。无论面对何种挑战,都能找到最适合的解决方案。
6 1
|
2天前
|
搜索推荐 Java 程序员
在Java编程的旅程中,条件语句是每位开发者不可或缺的伙伴,它如同导航系统,引导着程序根据不同的情况做出响应。
在Java编程中,条件语句是引导程序根据不同情境作出响应的核心工具。本文通过四个案例深入浅出地介绍了如何巧妙运用if-else与switch语句。从基础的用户登录验证到利用switch处理枚举类型,再到条件语句的嵌套与组合,最后探讨了代码的优化与重构。每个案例都旨在帮助开发者提升编码效率与代码质量,无论是初学者还是资深程序员,都能从中获得灵感,让自己的Java代码更加优雅和专业。
5 1
|
2天前
|
Java
在Java编程的广阔天地中,条件语句是控制程序流程、实现逻辑判断的重要工具。
在Java编程中,if-else与switch作为核心条件语句,各具特色。if-else以其高度灵活性,适用于复杂逻辑判断,支持多种条件组合;而switch在多分支选择上表现优异,尤其适合处理枚举类型或固定选项集,通过内部跳转表提高执行效率。两者各有千秋:if-else擅长复杂逻辑,switch则在多分支选择中更胜一筹。理解它们的特点并在合适场景下使用,能够编写出更高效、易读的Java代码。
5 1
|
4天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
4天前
|
设计模式 算法 Java
Java编程中的设计模式:简化复杂性的艺术
在Java的世界中,设计模式如同一位智慧的导师,指引着开发者们在复杂的编码迷宫中找到出口。本文将深入浅出地探讨几种常见的设计模式,通过实例演示如何在Java项目实践中运用这些模式,从而提升代码的可维护性和扩展性。无论你是新手还是资深开发者,这篇文章都将为你打开一扇通往高效编码的大门。
12 1