效率加倍,高并发场景下的接口请求合并方案

简介: 效率加倍,高并发场景下的接口请求合并方案


前言

请求合并到底有什么意义呢?我们来看下图。

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

代码实现

查询用户的代码

public interface UserService {
    Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {
    @Resource
    private UsersMapper usersMapper;
    @Override
    public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据
                result.put(val.getRequestId(), null);
            }
        });
        return result;
    }
}

合并请求的实现

 package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
 * zzq
 * 包装成批量执行的地方
 * */
@Service
public class UserWrapBatchService {
    @Resource
    private UserService userService;
    /**
     * 最大任务数
     **/
    public static int MAX_TASK_NUM = 100;
    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */
    public class Request {
        // 请求id 唯一
        String requestId;
        // 参数
        Long userId;
        //TODO Java 8 的 CompletableFuture 并没有 timeout 机制
        CompletableFuture<Users> completableFuture;
        public String getRequestId() {
            return requestId;
        }
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
        public Long getUserId() {
            return userId;
        }
        public void setUserId(Long userId) {
            this.userId = userId;
        }
        public CompletableFuture getCompletableFuture() {
            return completableFuture;
        }
        public void setCompletableFuture(CompletableFuture completableFuture) {
            this.completableFuture = completableFuture;
        }
    }
    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */
    private final Queue<Request> queue = new LinkedBlockingQueue();
    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
            //将处理结果返回各自的请求
            for (Request request : list) {
                Users result = response.get(request.requestId);
                request.completableFuture.complete(result);    //completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
            }
        }, 100, 10, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }
    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-", "");
        request.userId = userId;
        CompletableFuture<Users> future = new CompletableFuture<>();
        request.completableFuture = future;
        //将对象传入队列
        queue.offer(request);
        //如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

控制层调用

/***
 * 请求合并
 * */
@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
    return new Callable<Users>() {
        @Override
        public Users call() throws Exception {
            return userBatchService.queryUser(userId);
        }
    };
}

Callable是什么可以参考:

模拟高并发查询的代码

package com.springboot.sample;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class TestBatch {
    private static int threadCount = 30;
    private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //为保证30个线程同时并发运行
    private static final RestTemplate restTemplate = new RestTemplate();
    public static void main(String[] args) {
        for (int i = 0; i < threadCount; i++) {//循环开30个线程
            new Thread(new Runnable() {
                public void run() {
                    COUNT_DOWN_LATCH.countDown();//每次减一
                    try {
                        COUNT_DOWN_LATCH.await(); //此处等待状态,为了让30个线程同时进行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    for (int j = 1; j <= 3; j++) {
                        int param = new Random().nextInt(4);
                        if (param <=0){
                            param++;
                        }
                        String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
                        System.out.println(Thread.currentThread().getName() + "参数 " + param + " 返回值 " + responseBody);
                    }
                }
            }).start();
        }
    }
}

测试效果

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
 * zzq
 * 包装成批量执行的地方,使用queue解决超时问题
 * */
@Service
public class UserWrapBatchQueueService {
    @Resource
    private UserService userService;
    /**
     * 最大任务数
     **/
    public static int MAX_TASK_NUM = 100;
    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */
    public class Request {
        // 请求id
        String requestId;
        // 参数
        Long userId;
        // 队列,这个有超时机制
        LinkedBlockingQueue<Users> usersQueue;
        public String getRequestId() {
            return requestId;
        }
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
        public Long getUserId() {
            return userId;
        }
        public void setUserId(Long userId) {
            this.userId = userId;
        }
        public LinkedBlockingQueue<Users> getUsersQueue() {
            return usersQueue;
        }
        public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
            this.usersQueue = usersQueue;
        }
    }
    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */
    private final Queue<Request> queue = new LinkedBlockingQueue();
    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
            for (Request userReq : userReqs) {
                // 这里再把结果放到队列里
                Users users = response.get(userReq.getRequestId());
                userReq.usersQueue.offer(users);
            }
        }, 100, 10, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }
    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-", "");
        request.userId = userId;
        LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
        request.usersQueue = usersQueue;
        //将对象传入队列
        queue.offer(request);
        //取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
        try {
            return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}
...省略..
    @Override
    public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        // 数据分组
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据 , 这里要new,不然加入队列会空指针
                result.put(val.getRequestId(), new Users());
            }
        });
        return result;
    }
...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

参考



相关文章
|
4天前
|
缓存 监控 Java
Java 线程池在高并发场景下有哪些优势和潜在问题?
Java 线程池在高并发场景下有哪些优势和潜在问题?
|
6天前
|
缓存 负载均衡 API
抖音抖店API请求获取宝贝详情数据、原价、销量、主图等参数可支持高并发调用接入演示
这是一个使用Python编写的示例代码,用于从抖音抖店API获取商品详情,包括原价、销量和主图等信息。示例展示了如何构建请求、处理响应及提取所需数据。针对高并发场景,建议采用缓存、限流、负载均衡、异步处理及代码优化等策略,以提升性能和稳定性。
|
9天前
|
NoSQL Java Redis
京东双十一高并发场景下的分布式锁性能优化
【10月更文挑战第20天】在电商领域,尤其是像京东双十一这样的大促活动,系统需要处理极高的并发请求。这些请求往往涉及库存的查询和更新,如果处理不当,很容易出现库存超卖、数据不一致等问题。
29 1
|
22天前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
40 4
|
24天前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
36 2
|
18天前
|
Java Linux 应用服务中间件
【编程进阶知识】高并发场景下Bio与Nio的比较及原理示意图
本文介绍了在Linux系统上使用Tomcat部署Java应用程序时,BIO(阻塞I/O)和NIO(非阻塞I/O)在网络编程中的实现和性能差异。BIO采用传统的线程模型,每个连接请求都会创建一个新线程进行处理,导致在高并发场景下存在严重的性能瓶颈,如阻塞等待和线程创建开销大等问题。而NIO则通过事件驱动机制,利用事件注册、事件轮询器和事件通知,实现了更高效的连接管理和数据传输,避免了阻塞和多级数据复制,显著提升了系统的并发处理能力。
34 0
|
2月前
|
缓存 分布式计算 Hadoop
HBase在高并发场景下的性能分析
HBase在高并发场景下的性能受到多方面因素的影响,包括数据模型设计、集群配置、读写策略及性能调优等。合理的设计和配置可以显著提高HBase在高并发环境下的性能。不过,需要注意的是,由于项目和业务需求的不同,性能优化并没有一劳永逸的解决方案,需要根据实际情况进行针对性的调整和优化。
82 8
|
21天前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
62 0
|
6月前
|
消息中间件 Java Linux
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
|
5月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
471 0