伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

简介: 伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

接口背景

该接口来源于鱼皮大佬的星球项目——伙伴匹配系统,接口的作用是:根据当前登录用户的标签,为其匹配标签接近的用户,快速帮其找到志同道合的人


接口问题说明

当用户量较大的时候(如到达百万级别),伙伴匹配速度较慢,且占用的内存非常大,系统的并发量较低,希望可以通过一些策略来降低内存占用并加快匹配速度。


7d65767ad3c1472b9b4e8f16d5b44711.png

优化策略

鱼皮大佬在项目讲解的时候,已经提出了一些优化策略,如:


1.关掉数据库的查询日志,可以快30秒左右(不同机器提高时间不同,总之提升巨大)

2.查询数据的时候,只查询需要使用的字段,不要直接将所有字段都查询出来

3.查询数据的时候过滤掉“标签”字段为空的数据

4.内存预热,在系统用户量少的时候(如凌晨两点)提前缓存好用户的伙伴匹配数据(注意,如果缓存占用内存过多,可以只针对一些关键用户做内存预热,如活跃度高的用户、vip用户)


用户匹配度计算

该系统主要根据用户的标签来判断用户是否匹配,如用户1的标签是[“java”,“python”,“前端”]、用户2的标签是[“java”,“python”,“前端”],则两个用户是完全匹配的。

接口使用编辑距离来计算两个标签的匹配度,若两者的编辑距离越小,则匹配度越高。编辑距离是针对两个字符串的差异程度的度量,即至少需要多少次操作(操作方式有:新增、删除、修改一个字符)才能将一个字符串转换成另一个字符串。本文接口对编辑距离的应用则是将一个标签集合转换成另一个标签集合至少需要多少次操作,可以查看如下示例:


输入: 标签集合1= ["java","python","前端","matlab"], 标签集合2= ["java","python","后端"]
输出: 2
解释: 
["java","python","前端","matlab"]-> ["java","python","后端","matlab"] (将 '前端' 替换为 '后端')
["java","python","后端","matlab"]-> ["java","python","后端"] (将 'matlab' 删除)


接口改进与测试

说明

以下接口测试使用相同的数据,目标是为当前登录用户推荐十个匹配度最高(编辑距离最小)的用户。


改进前

从下图可以看出,接口调用时间为5秒左右,图中对象的占用内存达到 200MB。

改进一(使用优先队列存储编辑距离较小的n个元素)

堆是一种实用的数据结构,常用来求解 Top K 问题,比如 如何快速获取点赞数量最多的十篇文章,本文接口目标是取出编辑距离最小(即匹配度最高)的十个用户。

实现思路:维护一个节点数量为10的大顶堆,节点的值为编辑距离,堆是一颗完全二叉树,堆中节点的值都大于等于其子节点的值,则堆顶元素的编辑距离最大。想要维护十个编辑距离最小的元素,只需要在遍历元素的时候,判断新元素的编辑距离是否小于堆顶元素的编辑距离,若小于,则踢出堆顶元素,加入新元素即可。在java中,可以使用优先队列 PriorityQueue 来实现大顶堆的操作。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers1(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    List<User> userList = this.list(queryWrapper);
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 四、先将前面num个元素添加到优先队列中
    int userListSize = userList.size();
    // 计算提前插入量(用户数量还不一定有查询数量多)
    int advanceInsertAmount = Math.min(initialCapacity, userListSize);
    // 已经插入优先队列的元素数量
    int insertNum = 0;
    // 记录当前所迭代到用户的索引
    int index = 0;
    while (insertNum < advanceInsertAmount && index < userListSize - 1) {
        // index++,是先get,之后才执行 +1 逻辑
        User user = userList.get(index++);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        } else {
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 添加元素到堆中
            priorityQueue.add(new Pair<>(user, distance));
            insertNum++;
        }
    }
    // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
    for (int i = index; i < userListSize; i++) {
        User user = userList.get(i);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        }
        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
        }.getType());
        // 计算编辑距离
        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
        // 获取堆顶元素的编辑距离
        Long biggestDistance = priorityQueue.peek().getValue();
        if (distance < biggestDistance) {
            // 删除堆顶元素(删除距离最大的元素)
            priorityQueue.poll();
            // 添加距离更小的元素
            priorityQueue.add(new Pair<>(user, distance));
        }
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


如上图所示,priorityQueue的内存占用是25MB左右,远远小于未改进前的200MB。但是不要被迷惑了,上述代码一开始还是使用userList来接收所有用户数据,因此峰值内存并没有减少。


改进二(使用优先队列存储编辑距离较小的n个元素+数据分批查询、分批处理)

既然一开始使用userList来接收所有用户数据会占用不少内存,那是否可以对此进行优化呢?答案显然是可以的,那就是对数据进行分批查询(分页查询)即可,查询一批就处理一批,处理完直接将数据丢掉即可,具体操作可以查看下面的代码。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers2(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    int current = 1;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    while (true) {
        System.out.println("current:" + current);
        Page<User> userPage = this.page(new Page<>(current, batchSize), queryWrapper);
        List<User> userList = userPage.getRecords();
        System.out.println("userList内存大小" + RamUsageEstimator.humanSizeOf(userList));
        if (userList.size() == 0) {
            break;
        }
        System.out.println("当前用户id:" + userList.get(0).getId());
        // 四、先将前面num个元素添加到优先队列中
        // 记录当前所迭代到用户的索引
        int index = 0;
        if (current == 1) {
            int userListSize = userList.size();
            // 计算提前插入量(用户数量还不一定有查询数量多)
            int advanceInsertAmount = Math.min(initialCapacity, userListSize);
            // 已经插入优先队列的元素数量
            int insertNum = 0;
            while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                // index++,是先get,之后才执行 +1 逻辑
                User user = userList.get(index++);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                } else {
                    List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                    }.getType());
                    // 计算编辑距离
                    long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                    // 添加元素到堆中
                    priorityQueue.add(new Pair<>(user, distance));
                    insertNum++;
                }
            }
        }
        // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
        for (int i = index; i < userList.size(); i++) {
            User user = userList.get(i);
            String userTags = user.getTags();
            // 排除无标签的用户或者自己
            if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                continue;
            }
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 获取堆顶元素的编辑距离
            Long biggestDistance = priorityQueue.peek().getValue();
            if (distance < biggestDistance) {
                // 删除堆顶元素(删除距离最大的元素)
                priorityQueue.poll();
                // 添加距离更小的元素
                priorityQueue.add(new Pair<>(user, distance));
            }
        }
        current++;
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}

下图为分批处理过程中,userList的内存占用,这样峰值内存占用就减下来了,但是接口调用时间却翻了一倍,妥妥的“时间换空间”了。


改进三(使用优先队列存储编辑距离较小的n个元素+数据多线程分批查询、分批处理)

既然数据都分批处理了,那为何不想办法让多个线程同时处理呢,这样接口调用时间就可以减下来了。要注意的是:PriorityQueue是线程不安全的,在使用多线程的时候,应该使用其好兄弟PriorityBlockingQueue。

/**
* 创建线程池
*/
private ExecutorService executor = new ThreadPoolExecutor(5, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));
/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers3(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    long totalUserNum = baseMapper.selectCount(new QueryWrapper<>());
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityBlockingQueue<Pair<User, Long>> priorityQueue = new PriorityBlockingQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 计算分批数
    int batchNum = totalUserNum / batchSize + (totalUserNum % batchSize) > 0 ? 1 : 0;
    List<CompletableFuture<Void>> futureList = new ArrayList<>();
    for (int current = 1; current <= batchNum; current++) {
        // 异步执行
        int finalCurrent = current;
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            Page<User> userPage = this.page(new Page<>(finalCurrent, batchSize), queryWrapper);
            List<User> userList = userPage.getRecords();
            // 四、先将前面num个元素添加到优先队列中
            // 记录当前所迭代到用户的索引
            int index = 0;
            if (finalCurrent == 1) {
                int userListSize = userList.size();
                // 计算提前插入量(用户数量还不一定有查询数量多)
                int advanceInsertAmount = Math.min(initialCapacity, userListSize);
                // 已经插入优先队列的元素数量
                int insertNum = 0;
                while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                    // index++,是先get,之后才执行 +1 逻辑
                    User user = userList.get(index++);
                    String userTags = user.getTags();
                    // 排除无标签的用户或者自己
                    if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                        continue;
                    } else {
                        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                        }.getType());
                        // 计算编辑距离
                        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                        // 添加元素到堆中
                        priorityQueue.add(new Pair<>(user, distance));
                        insertNum++;
                    }
                }
            }
            // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
            for (int i = index; i < userList.size(); i++) {
                User user = userList.get(i);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                }
                List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                }.getType());
                // 计算编辑距离
                long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                // 获取堆顶元素的编辑距离
                Long biggestDistance = priorityQueue.peek().getValue();
                if (distance < biggestDistance) {
                    // 删除堆顶元素(删除距离最大的元素)
                    priorityQueue.poll();
                    // 添加距离更小的元素
                    priorityQueue.add(new Pair<>(user, distance));
                }
            }
        }, executor);
        futureList.add(future);
    }
    // 阻塞,等待所有线程执行完成
    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


下图为应用上述改进后的测试结果,可以看到时间被打下来了。


总结

程序是可以不断优化的,希望大家可以多从不同的角度来思考如何改进自己对问题的解决方式。

作者的知识浅薄,如果有大佬有更好的改进思路,求不吝赐教。

目录
相关文章
|
1天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
1天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
15 1
|
5天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
6天前
|
Java 开发者
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
31 4
|
6天前
|
消息中间件 供应链 Java
掌握Java多线程编程的艺术
【10月更文挑战第29天】 在当今软件开发领域,多线程编程已成为提升应用性能和响应速度的关键手段之一。本文旨在深入探讨Java多线程编程的核心技术、常见问题以及最佳实践,通过实际案例分析,帮助读者理解并掌握如何在Java应用中高效地使用多线程。不同于常规的技术总结,本文将结合作者多年的实践经验,以故事化的方式讲述多线程编程的魅力与挑战,旨在为读者提供一种全新的学习视角。
27 3
|
缓存 Oracle IDE
深入分析Java反射(八)-优化反射调用性能
Java反射的API在JavaSE1.7的时候已经基本完善,但是本文编写的时候使用的是Oracle JDK11,因为JDK11对于sun包下的源码也上传了,可以直接通过IDE查看对应的源码和进行Debug。
383 0
|
13天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
93 38
|
10天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
14天前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
31 1
[Java]线程生命周期与线程通信
|
12天前
|
安全 Java
在 Java 中使用实现 Runnable 接口的方式创建线程
【10月更文挑战第22天】通过以上内容的介绍,相信你已经对在 Java 中如何使用实现 Runnable 接口的方式创建线程有了更深入的了解。在实际应用中,需要根据具体的需求和场景,合理选择线程创建方式,并注意线程安全、同步、通信等相关问题,以确保程序的正确性和稳定性。
下一篇
无影云桌面