使用线程池多线程优化大数据量项目 ✨ 每日积累

简介: 使用线程池多线程优化大数据量项目 ✨ 每日积累

背景


当项目中有获取n个模块的信息,之后进行多个模块信息合并的操作,可以使用多线程来实现,开启多个异步线程哪区多个模块数据。例如获取用户基础信息和获取用户账号信息,他们分别处于不同的表或者不同的数据库中。


图示如下


1.png

模拟代码

用户合并信息bean

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
public class UserInfo implements Serializable {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    private UserInfo(UserBuilder userBuilder){
        this.userId = userBuilder.userBaseInfo.getUserId();
        if (userBuilder.userBaseInfo.getUserName() != null) this.userName = userBuilder.userBaseInfo.getUserName();
        if (userBuilder.userBaseInfo.getSex() != null) this.sex = userBuilder.userBaseInfo.getSex();
        this.age = userBuilder.userBaseInfo.getAge();
        if (userBuilder.userBaseInfo.getHobby() != null)this.hobby = userBuilder.userBaseInfo.getHobby();
        if (userBuilder.userBaseInfo.getPassword() != null) this.password = userBuilder.userBaseInfo.getPassword();
        if (userBuilder.userBaseInfo.getPhoneNumber() != null) this.phoneNumber = userBuilder.userBaseInfo.getPhoneNumber();
        if (userBuilder.userAccountInfo.getAccountId() != null) this.accountId = userBuilder.userAccountInfo.getAccountId();
        if (userBuilder.userAccountInfo.getBankOfAccountId() != null) this.bankOfAccountId = userBuilder.userAccountInfo.getBankOfAccountId();
        if (userBuilder.userAccountInfo.getAccountDeposit() != null) this.accountDeposit = userBuilder.userAccountInfo.getAccountDeposit();
    }
    /**
     * BeanBuilder能保证像重叠构造器模式那样的安全性,
     * 也能保证像JavaBeans模式那么好的可读性。这就是Builder模式的一种形式,
     * 不直接生成想要的对象,而是让客户端利用所有必要的参数调用构造器(或者静态工厂),
     * 得到一个builder对象。然后客户端在builder对象上调用类似于setter的方法,
     * 来设置每个相关的可选参数。最后,客户端调用无参的builder方法来生成不可变的对象。
     * 这个builder是它构建类的静态成员类。
     *
     */
    public static class UserBuilder{
        private UserBaseInfo userBaseInfo;
        private UserAccountInfo userAccountInfo;
        public UserBuilder() {
        }
        public UserBuilder userBaseInfo(UserBaseInfo val) {
            this.userBaseInfo = val;
            return this;
        }
        public UserBuilder userAccountInfo(UserAccountInfo val) {
            this.userAccountInfo = val;
            return this;
        }
        public UserInfo build() {
            return new UserInfo(this);
        }
    }
}

用户基础信息bean

public class UserBaseInfo {
    private int userId;
    private String userName;
    private String sex;
    private int age;
    private String password;
    private String hobby;
    private String phoneNumber;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getSex() {
        return sex;
    }
    public void setSex(String sex) {
        this.sex = sex;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getHobby() {
        return hobby;
    }
    public void setHobby(String hobby) {
        this.hobby = hobby;
    }
    public String getPhoneNumber() {
        return phoneNumber;
    }
    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }
}

用户账号信息bean

import java.math.BigDecimal;
import java.math.BigInteger;
public class UserAccountInfo {
    private int userId;
    private BigInteger accountId;
    private BigDecimal accountDeposit;
    private String bankOfAccountId;
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public BigInteger getAccountId() {
        return accountId;
    }
    public void setAccountId(BigInteger accountId) {
        this.accountId = accountId;
    }
    public BigDecimal getAccountDeposit() {
        return accountDeposit;
    }
    public void setAccountDeposit(BigDecimal accountDeposit) {
        this.accountDeposit = accountDeposit;
    }
    public String getBankOfAccountId() {
        return bankOfAccountId;
    }
    public void setBankOfAccountId(String bankOfAccountId) {
        this.bankOfAccountId = bankOfAccountId;
    }
}

用户service接口

import java.util.List;
public interface UserService {
    List<UserBaseInfo> getUserInfos();
    List<UserAccountInfo> getUserAccountInfos();
}

用户service接口实现类

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
public class UserServiceImpl implements UserService{
    @Override
    public List<UserBaseInfo> getUserInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        UserBaseInfo userBaseInfo = null;
        for (int i = 0; i < 5000; i++) {
            userBaseInfo = new UserBaseInfo();
            userBaseInfo.setUserId(i);
            userBaseInfo.setUserName("测试账号" + i);
            userBaseInfo.setAge(18);
            userBaseInfo.setHobby("唱、跳、rap、篮球");
            userBaseInfo.setPassword("Test123!=" + i);
            userBaseInfo.setPhoneNumber("13111111111");
            userBaseInfo.setSex("无");
            userBaseInfoList.add(userBaseInfo);
        }
        System.out.println("userBaseInfoList size:" + userBaseInfoList.size());
        System.out.println("获取用户base信息用时:" + (System.currentTimeMillis() - startTime));
        return userBaseInfoList;
    }
    @Override
    public List<UserAccountInfo> getUserAccountInfos() {
        Long startTime = System.currentTimeMillis();
        List<UserAccountInfo> userAccountInfoList = new ArrayList<>();
        UserAccountInfo userAccountInfo = null;
        for (int i = 0; i < 5000; i++) {
            userAccountInfo = new UserAccountInfo();
            userAccountInfo.setUserId(i);
            userAccountInfo.setAccountId(new BigInteger(String.valueOf(10000000 + i)));
            userAccountInfo.setAccountDeposit(new BigDecimal("1314520" + i));
            userAccountInfo.setBankOfAccountId("中国银行");
            userAccountInfoList.add(userAccountInfo);
        }
        System.out.println("获取用户account信息用时:" + (System.currentTimeMillis() - startTime));
        return userAccountInfoList;
    }
}

模拟客户端测试

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestClient {
    public static void main(String[] args) {
        final UserService userBaseService = new UserServiceImpl();
        long startTime = System.currentTimeMillis();
        //单线程串行运行
        //获取用户账户基本信息
        List<UserAccountInfo> userAccountInfos = userBaseService.getUserAccountInfos();
        //获取用户基本信息
        List<UserBaseInfo> userInfos = userBaseService.getUserInfos();
        System.out.println("单线程获取用户信息运行耗时:" + (System.currentTimeMillis() - startTime));
        //合并用户账号和基本信息来完善用户信息
        List<UserInfo> userInfoList = new ArrayList<>();
        setUserInfoList(userAccountInfos, userInfos, userInfoList);
        //清空list
        userInfoList.clear();
        System.out.println("-------------------------");
        long startTimeV2 = System.currentTimeMillis();
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        List<UserBaseInfo> userBaseInfoList = new ArrayList<>();
        List<UserAccountInfo> userAccountInfoArrayList = new ArrayList<>();
        try {
            userBaseInfoList = threadPoolExecutor.submit(new Callable<List<UserBaseInfo>>() {
                @Override
                public List<UserBaseInfo> call() throws Exception {
                    return userBaseService.getUserInfos();
                }
            }).get(); //阻塞主线程,等待线程池线程执行完毕
            userAccountInfoArrayList = threadPoolExecutor.submit(new Callable<List<UserAccountInfo>>() {
                @Override
                public List<UserAccountInfo> call() throws Exception {
                    return userBaseService.getUserAccountInfos();
                }
            }).get();//阻塞主线程,等待线程池线程执行完毕
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        threadPoolExecutor.shutdown();
        System.out.println("多线程运行耗时:" + (System.currentTimeMillis() - startTimeV2));
        setUserInfoList(userAccountInfoArrayList, userBaseInfoList, userInfoList);
    }
    /**
     * 用户完整信息装配
     * @param userAccountInfos
     * @param userBaseInfos
     * @param userInfoList
     */
    public static void setUserInfoList(List<UserAccountInfo> userAccountInfos,  List<UserBaseInfo> userBaseInfos, List<UserInfo> userInfoList){
        Long startTime = System.currentTimeMillis();
        userBaseInfos.stream().forEach(userInfo -> {
            userAccountInfos.stream().forEach(userAccountInfo -> {
                if (userInfo.getUserId() == userAccountInfo.getUserId()) {
                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userInfo).userAccountInfo(userAccountInfo).build());
                }
            });
        });
//        for (int i = 0; i < userBaseInfos.size(); i++) {
//            for (int i1 = i; i1 < userAccountInfos.size(); i1++) {
//                if (userBaseInfos.get(i).getUserId() == userAccountInfos.get(i1).getUserId()) {
//                    userInfoList.add(new UserInfo.UserBuilder().userBaseInfo(userBaseInfos.get(i)).userAccountInfo(userAccountInfos.get(i1)).build());
//                }
//            }
//        }
        System.out.println("用户完整信息装配花费:" + (System.currentTimeMillis() - startTime));
    }
}

运行结果

1.png

如上图所示,如果是大数据量下,优化效果是很显著的,使用自定义线程池、Callable和FutureTask结合使用,FutureTask中有get()方法,能阻塞住线程,等执行完了返回一个结果,主线程拿到结果才能继续往下执行。
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
8天前
|
缓存 Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第30天】本文将深入探讨Java并发编程中的一个重要主题——线程池。我们将详细解析线程池的概念、应用及其优化方法,帮助读者更好地理解和使用线程池,提高程序的性能和效率。
|
2天前
|
算法 安全 Java
Java性能优化(四)-多线程调优-Synchronized优化
JVM在JDK1.6中引入了分级锁机制来优化Synchronized,当一个线程获取锁时,首先对象锁将成为一个偏向锁,这样做是为了优化同一线程重复获取导致的用户态与内核态的切换问题;其次如果有多个线程竞争锁资源,锁将会升级为轻量级锁,它适用于在短时间内持有锁,且分锁有交替切换的场景;轻量级锁还使用了自旋锁来避免线程用户态与内核态的频繁切换,大大地提高了系统性能;但如果锁竞争太激烈了,那么同步锁将会升级为重量级锁。减少锁竞争,是优化Synchronized同步锁的关键。
19 2
|
7天前
|
缓存 监控 安全
Java的线程池和线程安全
Java的线程池和线程安全
|
1天前
|
监控 算法 Java
Java性能优化(九)-多线程调优-垃圾回收机制优化
Java性能优化(九)-多线程调优-垃圾回收机制优化
10 0
|
1天前
|
缓存 Java 测试技术
Java性能优化(八)-多线程调优-线程池大小设置
Java性能优化(八)-多线程调优-线程池大小设置
4 0
|
1天前
|
安全 Java 大数据
Java性能优化(七)-多线程调优-并发容器的使用
Java性能优化(七)-多线程调优-并发容器的使用
13 0
|
1天前
|
缓存 算法 安全
Java性能优化(六)-多线程调优-乐观锁
Java性能优化(六)-多线程调优-乐观锁
8 0
|
2天前
|
算法 安全 Java
Java性能优化(五)-多线程调优-Lock同步锁的优化
基本特点Lock锁的基本操作通常基于乐观锁实现,尽管在某些情况下(如阻塞时)它也可能采用悲观锁的策略。通过对比图,我们可以清晰地看到两种同步锁的基本特点。Lock同步锁与Synchronized的比较在Java中,同步锁机制是确保多线程安全访问共享资源的重要手段。与JVM隐式管理锁的Synchronized相比,Lock同步锁(以下简称Lock锁)提供了更细粒度的控制,通过显式地获取和释放锁,为开发者提供了更大的灵活性。一、基本特点。
13 1
|
3天前
|
安全 Java 容器
多线程(进阶四:线程安全的集合类)
多线程(进阶四:线程安全的集合类)
11 0
|
3天前
|
开发框架 监控 Java
【.NET Core】多线程之线程池(ThreadPool)详解(二)
【.NET Core】多线程之线程池(ThreadPool)详解(二)
24 3