伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

简介: 伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】

接口背景

该接口来源于鱼皮大佬的星球项目——伙伴匹配系统,接口的作用是:根据当前登录用户的标签,为其匹配标签接近的用户,快速帮其找到志同道合的人


接口问题说明

当用户量较大的时候(如到达百万级别),伙伴匹配速度较慢,且占用的内存非常大,系统的并发量较低,希望可以通过一些策略来降低内存占用并加快匹配速度。


7d65767ad3c1472b9b4e8f16d5b44711.png

优化策略

鱼皮大佬在项目讲解的时候,已经提出了一些优化策略,如:


1.关掉数据库的查询日志,可以快30秒左右(不同机器提高时间不同,总之提升巨大)

2.查询数据的时候,只查询需要使用的字段,不要直接将所有字段都查询出来

3.查询数据的时候过滤掉“标签”字段为空的数据

4.内存预热,在系统用户量少的时候(如凌晨两点)提前缓存好用户的伙伴匹配数据(注意,如果缓存占用内存过多,可以只针对一些关键用户做内存预热,如活跃度高的用户、vip用户)


用户匹配度计算

该系统主要根据用户的标签来判断用户是否匹配,如用户1的标签是[“java”,“python”,“前端”]、用户2的标签是[“java”,“python”,“前端”],则两个用户是完全匹配的。

接口使用编辑距离来计算两个标签的匹配度,若两者的编辑距离越小,则匹配度越高。编辑距离是针对两个字符串的差异程度的度量,即至少需要多少次操作(操作方式有:新增、删除、修改一个字符)才能将一个字符串转换成另一个字符串。本文接口对编辑距离的应用则是将一个标签集合转换成另一个标签集合至少需要多少次操作,可以查看如下示例:


输入: 标签集合1= ["java","python","前端","matlab"], 标签集合2= ["java","python","后端"]
输出: 2
解释: 
["java","python","前端","matlab"]-> ["java","python","后端","matlab"] (将 '前端' 替换为 '后端')
["java","python","后端","matlab"]-> ["java","python","后端"] (将 'matlab' 删除)


接口改进与测试

说明

以下接口测试使用相同的数据,目标是为当前登录用户推荐十个匹配度最高(编辑距离最小)的用户。


改进前

从下图可以看出,接口调用时间为5秒左右,图中对象的占用内存达到 200MB。

改进一(使用优先队列存储编辑距离较小的n个元素)

堆是一种实用的数据结构,常用来求解 Top K 问题,比如 如何快速获取点赞数量最多的十篇文章,本文接口目标是取出编辑距离最小(即匹配度最高)的十个用户。

实现思路:维护一个节点数量为10的大顶堆,节点的值为编辑距离,堆是一颗完全二叉树,堆中节点的值都大于等于其子节点的值,则堆顶元素的编辑距离最大。想要维护十个编辑距离最小的元素,只需要在遍历元素的时候,判断新元素的编辑距离是否小于堆顶元素的编辑距离,若小于,则踢出堆顶元素,加入新元素即可。在java中,可以使用优先队列 PriorityQueue 来实现大顶堆的操作。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers1(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    List<User> userList = this.list(queryWrapper);
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 四、先将前面num个元素添加到优先队列中
    int userListSize = userList.size();
    // 计算提前插入量(用户数量还不一定有查询数量多)
    int advanceInsertAmount = Math.min(initialCapacity, userListSize);
    // 已经插入优先队列的元素数量
    int insertNum = 0;
    // 记录当前所迭代到用户的索引
    int index = 0;
    while (insertNum < advanceInsertAmount && index < userListSize - 1) {
        // index++,是先get,之后才执行 +1 逻辑
        User user = userList.get(index++);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        } else {
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 添加元素到堆中
            priorityQueue.add(new Pair<>(user, distance));
            insertNum++;
        }
    }
    // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
    for (int i = index; i < userListSize; i++) {
        User user = userList.get(i);
        String userTags = user.getTags();
        // 排除无标签的用户或者自己
        if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
            continue;
        }
        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
        }.getType());
        // 计算编辑距离
        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
        // 获取堆顶元素的编辑距离
        Long biggestDistance = priorityQueue.peek().getValue();
        if (distance < biggestDistance) {
            // 删除堆顶元素(删除距离最大的元素)
            priorityQueue.poll();
            // 添加距离更小的元素
            priorityQueue.add(new Pair<>(user, distance));
        }
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


如上图所示,priorityQueue的内存占用是25MB左右,远远小于未改进前的200MB。但是不要被迷惑了,上述代码一开始还是使用userList来接收所有用户数据,因此峰值内存并没有减少。


改进二(使用优先队列存储编辑距离较小的n个元素+数据分批查询、分批处理)

既然一开始使用userList来接收所有用户数据会占用不少内存,那是否可以对此进行优化呢?答案显然是可以的,那就是对数据进行分批查询(分页查询)即可,查询一批就处理一批,处理完直接将数据丢掉即可,具体操作可以查看下面的代码。


/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers2(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    int current = 1;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityQueue<Pair<User, Long>> priorityQueue = new PriorityQueue<Pair<User, Long>>(initialCapacity, comparator);
    while (true) {
        System.out.println("current:" + current);
        Page<User> userPage = this.page(new Page<>(current, batchSize), queryWrapper);
        List<User> userList = userPage.getRecords();
        System.out.println("userList内存大小" + RamUsageEstimator.humanSizeOf(userList));
        if (userList.size() == 0) {
            break;
        }
        System.out.println("当前用户id:" + userList.get(0).getId());
        // 四、先将前面num个元素添加到优先队列中
        // 记录当前所迭代到用户的索引
        int index = 0;
        if (current == 1) {
            int userListSize = userList.size();
            // 计算提前插入量(用户数量还不一定有查询数量多)
            int advanceInsertAmount = Math.min(initialCapacity, userListSize);
            // 已经插入优先队列的元素数量
            int insertNum = 0;
            while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                // index++,是先get,之后才执行 +1 逻辑
                User user = userList.get(index++);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                } else {
                    List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                    }.getType());
                    // 计算编辑距离
                    long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                    // 添加元素到堆中
                    priorityQueue.add(new Pair<>(user, distance));
                    insertNum++;
                }
            }
        }
        // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
        for (int i = index; i < userList.size(); i++) {
            User user = userList.get(i);
            String userTags = user.getTags();
            // 排除无标签的用户或者自己
            if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                continue;
            }
            List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
            }.getType());
            // 计算编辑距离
            long distance = AlgorithmUtils.minDistance(tagList, userTagList);
            // 获取堆顶元素的编辑距离
            Long biggestDistance = priorityQueue.peek().getValue();
            if (distance < biggestDistance) {
                // 删除堆顶元素(删除距离最大的元素)
                priorityQueue.poll();
                // 添加距离更小的元素
                priorityQueue.add(new Pair<>(user, distance));
            }
        }
        current++;
    }
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}

下图为分批处理过程中,userList的内存占用,这样峰值内存占用就减下来了,但是接口调用时间却翻了一倍,妥妥的“时间换空间”了。


改进三(使用优先队列存储编辑距离较小的n个元素+数据多线程分批查询、分批处理)

既然数据都分批处理了,那为何不想办法让多个线程同时处理呢,这样接口调用时间就可以减下来了。要注意的是:PriorityQueue是线程不安全的,在使用多线程的时候,应该使用其好兄弟PriorityBlockingQueue。

/**
* 创建线程池
*/
private ExecutorService executor = new ThreadPoolExecutor(5, 1000, 10000, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000));
/**
 * 获取前num个最匹配的用户(使用优先队列优化内存占用+数据分批查询、分批处理)
 *
 * @param num
 * @param loginUser
 * @return
 */
public List<User> matchUsers3(int num, User loginUser) {
    long start = System.currentTimeMillis();
    // 将数据分批,每批所要处理的数据量
    int batchSize = 200000;
    // 一、获取当前登录用户的标签信息
    Gson gson = new Gson();
    // 将字符串json反序列回 集合
    List<String> tagList = gson.fromJson(loginUser.getTags(), new TypeToken<List<String>>() {
    }.getType());
    // 二、查询数据库中的所有用户数据(只查询两个字段,且只查询tags不为空的数据)
    long totalUserNum = baseMapper.selectCount(new QueryWrapper<>());
    QueryWrapper<User> queryWrapper = new QueryWrapper<User>().select("id", "tags").isNotNull("tags");
    // 三、创建优先队列,用来存储前num个距离最小(距离越小,匹配度越高)的用户
    // 创建比较器(按照编辑距离降序排序)
    Comparator<Pair<User, Long>> comparator = new Comparator<Pair<User, Long>>() {
        public int compare(Pair<User, Long> o1, Pair<User, Long> o2) {
            return -Long.compare(o1.getValue(), o2.getValue());
        }
    };
    // 堆的初始容量
    int initialCapacity = num;
    // 维护一个大顶堆,堆的顶部元素最大,在迭代的时候,如果新的距离比堆顶元素更小,则将堆顶元素踢出,添加新的元素
    PriorityBlockingQueue<Pair<User, Long>> priorityQueue = new PriorityBlockingQueue<Pair<User, Long>>(initialCapacity, comparator);
    // 计算分批数
    int batchNum = totalUserNum / batchSize + (totalUserNum % batchSize) > 0 ? 1 : 0;
    List<CompletableFuture<Void>> futureList = new ArrayList<>();
    for (int current = 1; current <= batchNum; current++) {
        // 异步执行
        int finalCurrent = current;
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            Page<User> userPage = this.page(new Page<>(finalCurrent, batchSize), queryWrapper);
            List<User> userList = userPage.getRecords();
            // 四、先将前面num个元素添加到优先队列中
            // 记录当前所迭代到用户的索引
            int index = 0;
            if (finalCurrent == 1) {
                int userListSize = userList.size();
                // 计算提前插入量(用户数量还不一定有查询数量多)
                int advanceInsertAmount = Math.min(initialCapacity, userListSize);
                // 已经插入优先队列的元素数量
                int insertNum = 0;
                while (insertNum < advanceInsertAmount && index < userListSize - 1) {
                    // index++,是先get,之后才执行 +1 逻辑
                    User user = userList.get(index++);
                    String userTags = user.getTags();
                    // 排除无标签的用户或者自己
                    if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                        continue;
                    } else {
                        List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                        }.getType());
                        // 计算编辑距离
                        long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                        // 添加元素到堆中
                        priorityQueue.add(new Pair<>(user, distance));
                        insertNum++;
                    }
                }
            }
            // 五、依次计算剩余所有用户的编辑距离,并更新优先队列的元素
            for (int i = index; i < userList.size(); i++) {
                User user = userList.get(i);
                String userTags = user.getTags();
                // 排除无标签的用户或者自己
                if (StringUtils.isBlank(userTags) || user.getId() == loginUser.getId()) {
                    continue;
                }
                List<String> userTagList = gson.fromJson(userTags, new TypeToken<List<String>>() {
                }.getType());
                // 计算编辑距离
                long distance = AlgorithmUtils.minDistance(tagList, userTagList);
                // 获取堆顶元素的编辑距离
                Long biggestDistance = priorityQueue.peek().getValue();
                if (distance < biggestDistance) {
                    // 删除堆顶元素(删除距离最大的元素)
                    priorityQueue.poll();
                    // 添加距离更小的元素
                    priorityQueue.add(new Pair<>(user, distance));
                }
            }
        }, executor);
        futureList.add(future);
    }
    // 阻塞,等待所有线程执行完成
    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
    // 六、获取用户的详细信息
    List<Long> userIdList = priorityQueue.stream().map(pair -> pair.getKey().getId()).collect(Collectors.toList());
    List<User> finalUserList = this.list(new QueryWrapper<User>().in("id", userIdList))
            .stream()
            // 用户数据脱敏
            .map(user -> getSafetyUser(user)).collect(Collectors.toList());
    long end = System.currentTimeMillis();
    System.out.println("匹配时间:" + (end - start) + "ms");
    System.out.println("队列长度:" + priorityQueue.size());
//        System.out.println("使用内存统计-----------------------------------------------------------------------------------------");
//        System.out.println("priorityQueue内存大小" + RamUsageEstimator.sizeOf(priorityQueue));
    System.out.println("priorityQueue内存大小" + RamUsageEstimator.humanSizeOf(priorityQueue));
    for (Pair<User, Long> userPair : priorityQueue) {
        System.out.println("用户id:" + userPair.getKey().getId() + ",距离:" + userPair.getValue());
    }
    System.out.println();
    return finalUserList;
}


下图为应用上述改进后的测试结果,可以看到时间被打下来了。


总结

程序是可以不断优化的,希望大家可以多从不同的角度来思考如何改进自己对问题的解决方式。

作者的知识浅薄,如果有大佬有更好的改进思路,求不吝赐教。

目录
相关文章
|
1天前
|
并行计算 Java API
Java 8中的接口默认方法和静态方法以及并行数组
【5月更文挑战第19天】Java 8引入了许多新特性,其中包括接口的默认方法和静态方法,以及并行数组的能力。这些特性增强了Java的面向对象编程模型和数组处理能力。让我们深入了解它们的概念和实践。
19 2
|
1天前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
22 2
|
1天前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
15 2
|
1天前
|
Java 程序员 调度
Java中的多线程编程:基础知识与实践
【5月更文挑战第19天】多线程编程是Java中的一个重要概念,它允许程序员在同一时间执行多个任务。本文将介绍Java多线程的基础知识,包括线程的创建、启动和管理,以及如何通过多线程提高程序的性能和响应性。
|
2天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
12 5
|
2天前
|
消息中间件 安全 Java
理解Java中的多线程编程
【5月更文挑战第18天】本文介绍了Java中的多线程编程,包括线程和多线程的基本概念。Java通过继承Thread类或实现Runnable接口来创建线程,此外还支持使用线程池(如ExecutorService和Executors)进行更高效的管理。多线程编程需要注意线程安全、性能优化和线程间通信,以避免数据竞争、死锁等问题,并确保程序高效运行。
|
2天前
|
存储 Java
【Java】实现一个简单的线程池
,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。
10 0
|
2天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
2天前
|
安全 Java 开发者
Java中的多线程编程:理解与实践
【5月更文挑战第18天】在现代软件开发中,多线程编程是提高程序性能和响应速度的重要手段。Java作为一种广泛使用的编程语言,其内置的多线程支持使得开发者能够轻松地实现并行处理。本文将深入探讨Java多线程的基本概念、实现方式以及常见的并发问题,并通过实例代码演示如何高效地使用多线程技术。通过阅读本文,读者将对Java多线程编程有一个全面的认识,并能够在实际开发中灵活运用。
|
2天前
|
算法 Java 程序员
Java中的线程同步与并发控制
【5月更文挑战第18天】随着计算机技术的不断发展,多核处理器的普及使得多线程编程成为提高程序性能的关键。在Java中,线程是实现并发的一种重要手段。然而,线程的并发执行可能导致数据不一致、死锁等问题。本文将深入探讨Java中线程同步的方法和技巧,以及如何避免常见的并发问题,从而提高程序的性能和稳定性。