伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

简介: 伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

接口背景

该接口来源于鱼皮大佬的星球项目——伙伴匹配系统,接口的作用是:根据当前登录用户的标签,为其匹配标签接近的用户,快速帮其找到志同道合的人


接口问题说明

当用户量较大的时候(如到达百万级别),伙伴匹配速度较慢,且占用的内存非常大,系统的并发量较低,希望可以通过一些策略来降低内存占用并加快匹配速度。


7d65767ad3c1472b9b4e8f16d5b44711.png

优化策略

鱼皮大佬在项目讲解的时候,已经提出了一些优化策略,如:


1.关掉数据库的查询日志,可以快30秒左右(不同机器提高时间不同,总之提升巨大)

2.查询数据的时候,只查询需要使用的字段,不要直接将所有字段都查询出来

3.查询数据的时候过滤掉“标签”字段为空的数据

4.内存预热,在系统用户量少的时候(如凌晨两点)提前缓存好用户的伙伴匹配数据(注意,如果缓存占用内存过多,可以只针对一些关键用户做内存预热,如活跃度高的用户、vip用户)


用户匹配度计算

该系统主要根据用户的标签来判断用户是否匹配,如用户1的标签是[“java”,“python”,“前端”]、用户2的标签是[“java”,“python”,“前端”],则两个用户是完全匹配的。

接口使用编辑距离来计算两个标签的匹配度,若两者的编辑距离越小,则匹配度越高。编辑距离是针对两个字符串的差异程度的度量,即至少需要多少次操作(操作方式有:新增、删除、修改一个字符)才能将一个字符串转换成另一个字符串。本文接口对编辑距离的应用则是将一个标签集合转换成另一个标签集合至少需要多少次操作,可以查看如下示例:


输入: 标签集合1= ["java","python","前端","matlab"], 标签集合2= ["java","python","后端"]
输出: 2
解释: 
["java","python","前端","matlab"]-> ["java","python","后端","matlab"] (将 '前端' 替换为 '后端')
["java","python","后端","matlab"]-> ["java","python","后端"] (将 'matlab' 删除)


接口改进与测试

说明

以下接口测试使用相同的数据,目标是为当前登录用户推荐十个匹配度最高(编辑距离最小)的用户。


改进前

从下图可以看出,接口调用时间为5秒左右,图中对象的占用内存达到 200MB。

改进一(使用优先队列存储编辑距离较小的n个元素)

堆是一种实用的数据结构,常用来求解 Top K 问题,比如 如何快速获取点赞数量最多的十篇文章,本文接口目标是取出编辑距离最小(即匹配度最高)的十个用户。

实现思路:维护一个节点数量为10的大顶堆,节点的值为编辑距离,堆是一颗完全二叉树,堆中节点的值都大于等于其子节点的值,则堆顶元素的编辑距离最大。想要维护十个编辑距离最小的元素,只需要在遍历元素的时候,判断新元素的编辑距离是否小于堆顶元素的编辑距离,若小于,则踢出堆顶元素,加入新元素即可。在java中,可以使用优先队列 PriorityQueue 来实现大顶堆的操作。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers1(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    List<User> userList = this.list(queryWrapper);
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 四、先将前面num个元素添加到优先队列中
    int userListSize = userList.size();
    // 计算提前插入量(用户数量还不一定有查询数量多)
    int advanceInsertAmount = Math.min(initialCapacity, userListSize);
    // 已经插入优先队列的元素数量
    int insertNum = 0;
    // 记录当前所迭代到用户的索引
    int index = 0;
    while (insertNum < advanceInsertAmount && index < userListSize - 1) {
        // index++,是先get,之后才执行 +1 逻辑
        User user = userList.get(index++);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        } else {
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 添加元素到堆中
            priorityQueue.add(new Pair<>(user, distance));
            insertNum++;
        }
    }
    // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
    for (int i = index; i < userListSize; i++) {
        User user = userList.get(i);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        }
        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
        }.getType());
        // 计算编辑距离
        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
        // 获取堆顶元素的编辑距离
        Long biggestDistance = priorityQueue.peek().getValue();
        if (distance < biggestDistance) {
            // 删除堆顶元素(删除距离最大的元素)
            priorityQueue.poll();
            // 添加距离更小的元素
            priorityQueue.add(new Pair<>(user, distance));
        }
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


如上图所示,priorityQueue的内存占用是25MB左右,远远小于未改进前的200MB。但是不要被迷惑了,上述代码一开始还是使用userList来接收所有用户数据,因此峰值内存并没有减少。


改进二(使用优先队列存储编辑距离较小的n个元素+数据分批查询、分批处理)

既然一开始使用userList来接收所有用户数据会占用不少内存,那是否可以对此进行优化呢?答案显然是可以的,那就是对数据进行分批查询(分页查询)即可,查询一批就处理一批,处理完直接将数据丢掉即可,具体操作可以查看下面的代码。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers2(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    int current = 1;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    while (true) {
        System.out.println("current:" + current);
        Page<User> userPage = this.page(new Page<>(current, batchSize), queryWrapper);
        List<User> userList = userPage.getRecords();
        System.out.println("userList内存大小" + RamUsageEstimator.humanSizeOf(userList));
        if (userList.size() == 0) {
            break;
        }
        System.out.println("当前用户id:" + userList.get(0).getId());
        // 四、先将前面num个元素添加到优先队列中
        // 记录当前所迭代到用户的索引
        int index = 0;
        if (current == 1) {
            int userListSize = userList.size();
            // 计算提前插入量(用户数量还不一定有查询数量多)
            int advanceInsertAmount = Math.min(initialCapacity, userListSize);
            // 已经插入优先队列的元素数量
            int insertNum = 0;
            while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                // index++,是先get,之后才执行 +1 逻辑
                User user = userList.get(index++);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                } else {
                    List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                    }.getType());
                    // 计算编辑距离
                    long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                    // 添加元素到堆中
                    priorityQueue.add(new Pair<>(user, distance));
                    insertNum++;
                }
            }
        }
        // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
        for (int i = index; i < userList.size(); i++) {
            User user = userList.get(i);
            String userTags = user.getTags();
            // 排除无标签的用户或者自己
            if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                continue;
            }
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 获取堆顶元素的编辑距离
            Long biggestDistance = priorityQueue.peek().getValue();
            if (distance < biggestDistance) {
                // 删除堆顶元素(删除距离最大的元素)
                priorityQueue.poll();
                // 添加距离更小的元素
                priorityQueue.add(new Pair<>(user, distance));
            }
        }
        current++;
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}

下图为分批处理过程中,userList的内存占用,这样峰值内存占用就减下来了,但是接口调用时间却翻了一倍,妥妥的“时间换空间”了。


改进三(使用优先队列存储编辑距离较小的n个元素+数据多线程分批查询、分批处理)

既然数据都分批处理了,那为何不想办法让多个线程同时处理呢,这样接口调用时间就可以减下来了。要注意的是:PriorityQueue是线程不安全的,在使用多线程的时候,应该使用其好兄弟PriorityBlockingQueue。

/**
* 创建线程池
*/
private ExecutorService executor = new ThreadPoolExecutor(5, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));
/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers3(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    long totalUserNum = baseMapper.selectCount(new QueryWrapper<>());
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityBlockingQueue<Pair<User, Long>> priorityQueue = new PriorityBlockingQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 计算分批数
    int batchNum = totalUserNum / batchSize + (totalUserNum % batchSize) > 0 ? 1 : 0;
    List<CompletableFuture<Void>> futureList = new ArrayList<>();
    for (int current = 1; current <= batchNum; current++) {
        // 异步执行
        int finalCurrent = current;
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            Page<User> userPage = this.page(new Page<>(finalCurrent, batchSize), queryWrapper);
            List<User> userList = userPage.getRecords();
            // 四、先将前面num个元素添加到优先队列中
            // 记录当前所迭代到用户的索引
            int index = 0;
            if (finalCurrent == 1) {
                int userListSize = userList.size();
                // 计算提前插入量(用户数量还不一定有查询数量多)
                int advanceInsertAmount = Math.min(initialCapacity, userListSize);
                // 已经插入优先队列的元素数量
                int insertNum = 0;
                while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                    // index++,是先get,之后才执行 +1 逻辑
                    User user = userList.get(index++);
                    String userTags = user.getTags();
                    // 排除无标签的用户或者自己
                    if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                        continue;
                    } else {
                        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                        }.getType());
                        // 计算编辑距离
                        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                        // 添加元素到堆中
                        priorityQueue.add(new Pair<>(user, distance));
                        insertNum++;
                    }
                }
            }
            // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
            for (int i = index; i < userList.size(); i++) {
                User user = userList.get(i);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                }
                List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                }.getType());
                // 计算编辑距离
                long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                // 获取堆顶元素的编辑距离
                Long biggestDistance = priorityQueue.peek().getValue();
                if (distance < biggestDistance) {
                    // 删除堆顶元素(删除距离最大的元素)
                    priorityQueue.poll();
                    // 添加距离更小的元素
                    priorityQueue.add(new Pair<>(user, distance));
                }
            }
        }, executor);
        futureList.add(future);
    }
    // 阻塞,等待所有线程执行完成
    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


下图为应用上述改进后的测试结果,可以看到时间被打下来了。


总结

程序是可以不断优化的,希望大家可以多从不同的角度来思考如何改进自己对问题的解决方式。

作者的知识浅薄,如果有大佬有更好的改进思路,求不吝赐教。

目录
相关文章
|
3天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
26 6
|
16天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
11天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
11天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
32 3
|
12天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
16天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
26 2
|
16天前
|
监控 Java 开发者
Java线程管理:守护线程与本地线程的深入剖析
在Java编程语言中,线程是程序执行的最小单元,它们可以并行执行以提高程序的效率和响应性。Java提供了两种特殊的线程类型:守护线程和本地线程。本文将深入探讨这两种线程的区别,并探讨它们在实际开发中的应用。
23 1
|
7月前
|
存储 安全 Java
深入理解Java并发编程:线程安全与锁机制
【5月更文挑战第31天】在Java并发编程中,线程安全和锁机制是两个核心概念。本文将深入探讨这两个概念,包括它们的定义、实现方式以及在实际开发中的应用。通过对线程安全和锁机制的深入理解,可以帮助我们更好地解决并发编程中的问题,提高程序的性能和稳定性。
|
4月前
|
存储 安全 Java
解锁Java并发编程奥秘:深入剖析Synchronized关键字的同步机制与实现原理,让多线程安全如磐石般稳固!
【8月更文挑战第4天】Java并发编程中,Synchronized关键字是确保多线程环境下数据一致性与线程安全的基础机制。它可通过修饰实例方法、静态方法或代码块来控制对共享资源的独占访问。Synchronized基于Java对象头中的监视器锁实现,通过MonitorEnter/MonitorExit指令管理锁的获取与释放。示例展示了如何使用Synchronized修饰方法以实现线程间的同步,避免数据竞争。掌握其原理对编写高效安全的多线程程序极为关键。
71 1
|
5月前
|
安全 Java 开发者
Java并发编程中的线程安全问题及解决方案探讨
在Java编程中,特别是在并发编程领域,线程安全问题是开发过程中常见且关键的挑战。本文将深入探讨Java中的线程安全性,分析常见的线程安全问题,并介绍相应的解决方案,帮助开发者更好地理解和应对并发环境下的挑战。【7月更文挑战第3天】
104 0