使用线程池多线程优化大数据量项目 ✨ 每日积累

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 使用线程池多线程优化大数据量项目 ✨ 每日积累

背景


当项目中有获取n个模块的信息,之后进行多个模块信息合并的操作,可以使用多线程来实现,开启多个异步线程哪区多个模块数据。例如获取用户基础信息和获取用户账号信息,他们分别处于不同的表或者不同的数据库中。


图示如下


1.png

模拟代码

用户合并信息bean

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
public class UserInfo implements Serializable {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    private UserInfo(UserBuilder userBuilder){
        this.userId = userBuilder.userBaseInfo.getUserId();
        if (userBuilder.userBaseInfo.getUserName() != null) this.userName = userBuilder.userBaseInfo.getUserName();
        if (userBuilder.userBaseInfo.getSex() != null) this.sex = userBuilder.userBaseInfo.getSex();
        this.age = userBuilder.userBaseInfo.getAge();
        if (userBuilder.userBaseInfo.getHobby() != null)this.hobby = userBuilder.userBaseInfo.getHobby();
        if (userBuilder.userBaseInfo.getPassword() != null) this.password = userBuilder.userBaseInfo.getPassword();
        if (userBuilder.userBaseInfo.getPhoneNumber() != null) this.phoneNumber = userBuilder.userBaseInfo.getPhoneNumber();
        if (userBuilder.userAccountInfo.getAccountId() != null) this.accountId = userBuilder.userAccountInfo.getAccountId();
        if (userBuilder.userAccountInfo.getBankOfAccountId() != null) this.bankOfAccountId = userBuilder.userAccountInfo.getBankOfAccountId();
        if (userBuilder.userAccountInfo.getAccountDeposit() != null) this.accountDeposit = userBuilder.userAccountInfo.getAccountDeposit();
    }
    /**
     * BeanBuilder能保证像重叠构造器模式那样的安全性,
     * 也能保证像JavaBeans模式那么好的可读性。这就是Builder模式的一种形式,
     * 不直接生成想要的对象,而是让客户端利用所有必要的参数调用构造器(或者静态工厂),
     * 得到一个builder对象。然后客户端在builder对象上调用类似于setter的方法,
     * 来设置每个相关的可选参数。最后,客户端调用无参的builder方法来生成不可变的对象。
     * 这个builder是它构建类的静态成员类。
     *
     */
    public static class UserBuilder{
        private UserBaseInfo userBaseInfo;
        private UserAccountInfo userAccountInfo;
        public UserBuilder() {
        }
        public UserBuilder userBaseInfo(UserBaseInfo val) {
            this.userBaseInfo = val;
            return this;
        }
        public UserBuilder userAccountInfo(UserAccountInfo val) {
            this.userAccountInfo = val;
            return this;
        }
        public UserInfo build() {
            return new UserInfo(this);
        }
    }
}

用户基础信息bean

public class UserBaseInfo {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getSex() {
        return sex;
    }
    public void setSex(String sex) {
        this.sex = sex;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getHobby() {
        return hobby;
    }
    public void setHobby(String hobby) {
        this.hobby = hobby;
    }
    public String getPhoneNumber() {
        return phoneNumber;
    }
    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }
}

用户账号信息bean

import java.math.BigDecimal;
import java.math.BigInteger;
public class UserAccountInfo {
    private int userId;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public BigInteger getAccountId() {
        return accountId;
    }
    public void setAccountId(BigInteger accountId) {
        this.accountId = accountId;
    }
    public BigDecimal getAccountDeposit() {
        return accountDeposit;
    }
    public void setAccountDeposit(BigDecimal accountDeposit) {
        this.accountDeposit = accountDeposit;
    }
    public String getBankOfAccountId() {
        return bankOfAccountId;
    }
    public void setBankOfAccountId(String bankOfAccountId) {
        this.bankOfAccountId = bankOfAccountId;
    }
}

用户service接口

import java.util.List;
public interface UserService {
    List<UserBaseInfo> getUserInfos();
    List<UserAccountInfo> getUserAccountInfos();
}

用户service接口实现类

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
public class UserServiceImpl implements UserService{
    @Override
    public List<UserBaseInfo> getUserInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        UserBaseInfo userBaseInfo = null;
        for (int i = 0; i < 5000; i++) {
            userBaseInfo = new UserBaseInfo();
            userBaseInfo.setUserId(i);
            userBaseInfo.setUserName("测试账号" + i);
            userBaseInfo.setAge(18);
            userBaseInfo.setHobby("唱、跳、rap、篮球");
            userBaseInfo.setPassword("Test123!=" + i);
            userBaseInfo.setPhoneNumber("13111111111");
            userBaseInfo.setSex("无");
            userBaseInfoList.add(userBaseInfo);
        }
        System.out.println("userBaseInfoList size:" + userBaseInfoList.size());
        System.out.println("获取用户base信息用时:" + (System.currentTimeMillis() - startTime));
        return userBaseInfoList;
    }
    @Override
    public List<UserAccountInfo> getUserAccountInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserAccountInfo> userAccountInfoList = new ArrayList<>();
        UserAccountInfo userAccountInfo = null;
        for (int i = 0; i < 5000; i++) {
            userAccountInfo = new UserAccountInfo();
            userAccountInfo.setUserId(i);
            userAccountInfo.setAccountId(new BigInteger(String.valueOf(10000000 + i)));
            userAccountInfo.setAccountDeposit(new BigDecimal("1314520" + i));
            userAccountInfo.setBankOfAccountId("中国银行");
            userAccountInfoList.add(userAccountInfo);
        }
        System.out.println("获取用户account信息用时:" + (System.currentTimeMillis() - startTime));
        return userAccountInfoList;
    }
}

模拟客户端测试

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestClient {
    public static void main(String[] args) {
        final UserService userBaseService = new UserServiceImpl();
        long startTime = System.currentTimeMillis();
        //单线程串行运行
        //获取用户账户基本信息
        List<UserAccountInfo> userAccountInfos = userBaseService.getUserAccountInfos();
        //获取用户基本信息
        List<UserBaseInfo> userInfos = userBaseService.getUserInfos();
        System.out.println("单线程获取用户信息运行耗时:" + (System.currentTimeMillis() - startTime));
        //合并用户账号和基本信息来完善用户信息
        List<UserInfo> userInfoList = new ArrayList<>();
        setUserInfoList(userAccountInfos, userInfos, userInfoList);
        //清空list
        userInfoList.clear();
        System.out.println("-------------------------");
        long startTimeV2 = System.currentTimeMillis();
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        List<UserAccountInfo> userAccountInfoArrayList = new ArrayList<>();
        try {
            userBaseInfoList = threadPoolExecutor.submit(new Callable<List<UserBaseInfo>>() {
                @Override
                public List<UserBaseInfo> call() throws Exception {
                    return userBaseService.getUserInfos();
                }
            }).get(); //阻塞主线程,等待线程池线程执行完毕
            userAccountInfoArrayList = threadPoolExecutor.submit(new Callable<List<UserAccountInfo>>() {
                @Override
                public List<UserAccountInfo> call() throws Exception {
                    return userBaseService.getUserAccountInfos();
                }
            }).get();//阻塞主线程,等待线程池线程执行完毕
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
        System.out.println("多线程运行耗时:" + (System.currentTimeMillis() - startTimeV2));
        setUserInfoList(userAccountInfoArrayList, userBaseInfoList, userInfoList);
    }
    /**
     * 用户完整信息装配
     * @param userAccountInfos
     * @param userBaseInfos
     * @param userInfoList
     */
    public static void setUserInfoList(List<UserAccountInfo> userAccountInfos,  List<UserBaseInfo> userBaseInfos, List<UserInfo> userInfoList){
        Long startTime = System.currentTimeMillis();
        userBaseInfos.stream().forEach(userInfo -> {
            userAccountInfos.stream().forEach(userAccountInfo -> {
                if (userInfo.getUserId() == userAccountInfo.getUserId()) {
                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userInfo).userAccountInfo(userAccountInfo).build());
                }
            });
        });
//        for (int i = 0; i < userBaseInfos.size(); i++) {
//            for (int i1 = i; i1 < userAccountInfos.size(); i1++) {
//                if (userBaseInfos.get(i).getUserId() == userAccountInfos.get(i1).getUserId()) {
//                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userBaseInfos.get(i)).userAccountInfo(userAccountInfos.get(i1)).build());
//                }
//            }
//        }
        System.out.println("用户完整信息装配花费:" + (System.currentTimeMillis() - startTime));
    }
}

运行结果

1.png

如上图所示,如果是大数据量下,优化效果是很显著的,使用自定义线程池、Callable和FutureTask结合使用,FutureTask中有get()方法,能阻塞住线程,等执行完了返回一个结果,主线程拿到结果才能继续往下执行。
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
70 2
|
18小时前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
2月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
229 64
|
24天前
|
并行计算 算法 安全
面试必问的多线程优化技巧与实战
多线程编程是现代软件开发中不可或缺的一部分,特别是在处理高并发场景和优化程序性能时。作为Java开发者,掌握多线程优化技巧不仅能够提升程序的执行效率,还能在面试中脱颖而出。本文将从多线程基础、线程与进程的区别、多线程的优势出发,深入探讨如何避免死锁与竞态条件、线程间的通信机制、线程池的使用优势、线程优化算法与数据结构的选择,以及硬件加速技术。通过多个Java示例,我们将揭示这些技术的底层原理与实现方法。
76 3
|
26天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
56 1
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
126 38
|
2月前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
89 4
|
2月前
|
存储 算法 固态存储
大数据分区优化存储成本
大数据分区优化存储成本
44 4
|
2月前
|
存储 大数据 Serverless
大数据增加分区优化资源使用
大数据增加分区优化资源使用
46 1