使用线程池多线程优化大数据量项目 ✨ 每日积累

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 使用线程池多线程优化大数据量项目 ✨ 每日积累

背景


当项目中有获取n个模块的信息,之后进行多个模块信息合并的操作,可以使用多线程来实现,开启多个异步线程哪区多个模块数据。例如获取用户基础信息和获取用户账号信息,他们分别处于不同的表或者不同的数据库中。


图示如下


1.png

模拟代码

用户合并信息bean

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
public class UserInfo implements Serializable {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    private UserInfo(UserBuilder userBuilder){
        this.userId = userBuilder.userBaseInfo.getUserId();
        if (userBuilder.userBaseInfo.getUserName() != null) this.userName = userBuilder.userBaseInfo.getUserName();
        if (userBuilder.userBaseInfo.getSex() != null) this.sex = userBuilder.userBaseInfo.getSex();
        this.age = userBuilder.userBaseInfo.getAge();
        if (userBuilder.userBaseInfo.getHobby() != null)this.hobby = userBuilder.userBaseInfo.getHobby();
        if (userBuilder.userBaseInfo.getPassword() != null) this.password = userBuilder.userBaseInfo.getPassword();
        if (userBuilder.userBaseInfo.getPhoneNumber() != null) this.phoneNumber = userBuilder.userBaseInfo.getPhoneNumber();
        if (userBuilder.userAccountInfo.getAccountId() != null) this.accountId = userBuilder.userAccountInfo.getAccountId();
        if (userBuilder.userAccountInfo.getBankOfAccountId() != null) this.bankOfAccountId = userBuilder.userAccountInfo.getBankOfAccountId();
        if (userBuilder.userAccountInfo.getAccountDeposit() != null) this.accountDeposit = userBuilder.userAccountInfo.getAccountDeposit();
    }
    /**
     * BeanBuilder能保证像重叠构造器模式那样的安全性,
     * 也能保证像JavaBeans模式那么好的可读性。这就是Builder模式的一种形式,
     * 不直接生成想要的对象,而是让客户端利用所有必要的参数调用构造器(或者静态工厂),
     * 得到一个builder对象。然后客户端在builder对象上调用类似于setter的方法,
     * 来设置每个相关的可选参数。最后,客户端调用无参的builder方法来生成不可变的对象。
     * 这个builder是它构建类的静态成员类。
     *
     */
    public static class UserBuilder{
        private UserBaseInfo userBaseInfo;
        private UserAccountInfo userAccountInfo;
        public UserBuilder() {
        }
        public UserBuilder userBaseInfo(UserBaseInfo val) {
            this.userBaseInfo = val;
            return this;
        }
        public UserBuilder userAccountInfo(UserAccountInfo val) {
            this.userAccountInfo = val;
            return this;
        }
        public UserInfo build() {
            return new UserInfo(this);
        }
    }
}

用户基础信息bean

public class UserBaseInfo {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getSex() {
        return sex;
    }
    public void setSex(String sex) {
        this.sex = sex;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getHobby() {
        return hobby;
    }
    public void setHobby(String hobby) {
        this.hobby = hobby;
    }
    public String getPhoneNumber() {
        return phoneNumber;
    }
    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }
}

用户账号信息bean

import java.math.BigDecimal;
import java.math.BigInteger;
public class UserAccountInfo {
    private int userId;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public BigInteger getAccountId() {
        return accountId;
    }
    public void setAccountId(BigInteger accountId) {
        this.accountId = accountId;
    }
    public BigDecimal getAccountDeposit() {
        return accountDeposit;
    }
    public void setAccountDeposit(BigDecimal accountDeposit) {
        this.accountDeposit = accountDeposit;
    }
    public String getBankOfAccountId() {
        return bankOfAccountId;
    }
    public void setBankOfAccountId(String bankOfAccountId) {
        this.bankOfAccountId = bankOfAccountId;
    }
}

用户service接口

import java.util.List;
public interface UserService {
    List<UserBaseInfo> getUserInfos();
    List<UserAccountInfo> getUserAccountInfos();
}

用户service接口实现类

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
public class UserServiceImpl implements UserService{
    @Override
    public List<UserBaseInfo> getUserInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        UserBaseInfo userBaseInfo = null;
        for (int i = 0; i < 5000; i++) {
            userBaseInfo = new UserBaseInfo();
            userBaseInfo.setUserId(i);
            userBaseInfo.setUserName("测试账号" + i);
            userBaseInfo.setAge(18);
            userBaseInfo.setHobby("唱、跳、rap、篮球");
            userBaseInfo.setPassword("Test123!=" + i);
            userBaseInfo.setPhoneNumber("13111111111");
            userBaseInfo.setSex("无");
            userBaseInfoList.add(userBaseInfo);
        }
        System.out.println("userBaseInfoList size:" + userBaseInfoList.size());
        System.out.println("获取用户base信息用时:" + (System.currentTimeMillis() - startTime));
        return userBaseInfoList;
    }
    @Override
    public List<UserAccountInfo> getUserAccountInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserAccountInfo> userAccountInfoList = new ArrayList<>();
        UserAccountInfo userAccountInfo = null;
        for (int i = 0; i < 5000; i++) {
            userAccountInfo = new UserAccountInfo();
            userAccountInfo.setUserId(i);
            userAccountInfo.setAccountId(new BigInteger(String.valueOf(10000000 + i)));
            userAccountInfo.setAccountDeposit(new BigDecimal("1314520" + i));
            userAccountInfo.setBankOfAccountId("中国银行");
            userAccountInfoList.add(userAccountInfo);
        }
        System.out.println("获取用户account信息用时:" + (System.currentTimeMillis() - startTime));
        return userAccountInfoList;
    }
}

模拟客户端测试

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestClient {
    public static void main(String[] args) {
        final UserService userBaseService = new UserServiceImpl();
        long startTime = System.currentTimeMillis();
        //单线程串行运行
        //获取用户账户基本信息
        List<UserAccountInfo> userAccountInfos = userBaseService.getUserAccountInfos();
        //获取用户基本信息
        List<UserBaseInfo> userInfos = userBaseService.getUserInfos();
        System.out.println("单线程获取用户信息运行耗时:" + (System.currentTimeMillis() - startTime));
        //合并用户账号和基本信息来完善用户信息
        List<UserInfo> userInfoList = new ArrayList<>();
        setUserInfoList(userAccountInfos, userInfos, userInfoList);
        //清空list
        userInfoList.clear();
        System.out.println("-------------------------");
        long startTimeV2 = System.currentTimeMillis();
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        List<UserAccountInfo> userAccountInfoArrayList = new ArrayList<>();
        try {
            userBaseInfoList = threadPoolExecutor.submit(new Callable<List<UserBaseInfo>>() {
                @Override
                public List<UserBaseInfo> call() throws Exception {
                    return userBaseService.getUserInfos();
                }
            }).get(); //阻塞主线程,等待线程池线程执行完毕
            userAccountInfoArrayList = threadPoolExecutor.submit(new Callable<List<UserAccountInfo>>() {
                @Override
                public List<UserAccountInfo> call() throws Exception {
                    return userBaseService.getUserAccountInfos();
                }
            }).get();//阻塞主线程,等待线程池线程执行完毕
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
        System.out.println("多线程运行耗时:" + (System.currentTimeMillis() - startTimeV2));
        setUserInfoList(userAccountInfoArrayList, userBaseInfoList, userInfoList);
    }
    /**
     * 用户完整信息装配
     * @param userAccountInfos
     * @param userBaseInfos
     * @param userInfoList
     */
    public static void setUserInfoList(List<UserAccountInfo> userAccountInfos,  List<UserBaseInfo> userBaseInfos, List<UserInfo> userInfoList){
        Long startTime = System.currentTimeMillis();
        userBaseInfos.stream().forEach(userInfo -> {
            userAccountInfos.stream().forEach(userAccountInfo -> {
                if (userInfo.getUserId() == userAccountInfo.getUserId()) {
                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userInfo).userAccountInfo(userAccountInfo).build());
                }
            });
        });
//        for (int i = 0; i < userBaseInfos.size(); i++) {
//            for (int i1 = i; i1 < userAccountInfos.size(); i1++) {
//                if (userBaseInfos.get(i).getUserId() == userAccountInfos.get(i1).getUserId()) {
//                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userBaseInfos.get(i)).userAccountInfo(userAccountInfos.get(i1)).build());
//                }
//            }
//        }
        System.out.println("用户完整信息装配花费:" + (System.currentTimeMillis() - startTime));
    }
}

运行结果

1.png

如上图所示,如果是大数据量下,优化效果是很显著的,使用自定义线程池、Callable和FutureTask结合使用,FutureTask中有get()方法,能阻塞住线程,等执行完了返回一个结果,主线程拿到结果才能继续往下执行。
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
3天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
71 38
|
1天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
9 2
|
3天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
24 4
|
3天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
30 2
|
6天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
11 3
|
6天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
9 2
|
6天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
15 2
|
6天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
16 1
|
6天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
15 1
|
6天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
13 1