前言
在几年前,我就看到过有些博客写关于合并请求的文章,一开始我没有太在意,最近在看一个up讲述关于商品模块的牛X设计,为了提高高并发的处理能力,一般会用redis 自增自减来实现库存扣减,但是他采用合并扣减库存,也就是同一时间n个扣减库存会合并成一个请求,这样无疑减少了IO次数,也提高系统性能。然后今天我看淘系有篇文章讲述 怎么提高自己的系统架构水平,里面也提到合并更新操作。
🐱那么这么秀的操作,真的不尝试下吗?Let's do it~
合并请求的目的
Why
A:你是否有遇到过类似场景,需要插入数据库很多很多条数据,那我们怎么优化?还有多个redis key请求的时候,如果我们每次都去拿到连接调用,可以发现时间会比较长,那怎么处理的?MQ场景,我们有一批的消息需要发送,是不是一个一个去投递?
答案是NO,我们会批量去处理这些任务,减少IO次数,其次批量处理可以减少往返网络通讯时间,提高效率。
通过简单demo感受下
秉着没吃过猪肉,也见过猪跑的想法,我们参照别人的思路:Future来实现异步调用,等到批量处理结果出来之后再返回。
show me your code
我们先想象下,需要什么变量,合并请求是随意合并吗? 答案当然不是,那么我们是将一段时间内的请求进行合并。
那么需要一个标识来表示是否符合这段时间,或者算法。比如说一秒内,都是这个需要合并请求的范围。下面就用i来计数,需求:比如说超过2个并发,我们就需要合并请求了。
其次我们是不是要收集所有请求的对象,还有请求处理完,我们是不是要查询处理结果集,这样我们才好返回数据。所以加上LinkedList来缓存这一时刻的请求实体,以及使用set数据结构来模拟数据库db或者其他缓存处理结果的地方。
//某一时间段并发数量,可能1秒内
static int i = 0;
//这个一时刻的处理操作的队列
LinkedList<String> list = new LinkedList<>();
//可以代替DB,或者处理结果缓存地方
Set<String> set = new HashSet<>();
- 我们需要模拟一个业务处理器,定时执行list列表的任务,然后把结果丢到set结果集里头
CompletableFuture.runAsync(() -> {
while (true) {
if(!list.isEmpty()){
Collections.shuffle(list);
set.addAll(list);
list.clear();
}
}
});
- 模拟并发合并请求操作
@GetMapping("firstRequest")
public boolean firstRequest() throws InterruptedException, ExecutionException, TimeoutException {
//假设firstRequest,secondRequest同一个时间进来的
i++;
Thread.sleep(2000);
if (i > 1) {
list.add("firstRequest");
}
return CompletableFuture.supplyAsync(() -> {
int i1 = 0;
while (true) {
i1++;
if (i1 > 5) {
return false;
}
if (set.remove("firstRequest")) {
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).get();
}
@GetMapping("secondRequest")
public boolean secondRequest() throws ExecutionException, InterruptedException, TimeoutException {
i++;
if (i > 1) {
list.add("secondRequest");
}
return CompletableFuture.supplyAsync(() -> {
int i1 = 0;
while (true) {
i1++;
if (i1 > 5) {
return false;
}
if (set.remove("secondRequest")) {
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).get();
}
🙋♂️ 我们看下第一个请求跟第二个请求,都做了啥操作?
首先第一个请求,会给i+1,代表当前时间段的计算器请求+1,然后等待一段时间,其实是为了,等第二个请求访问,让计算器达到并发等于2的场景。
然后分别将第一个跟第二个任务丢到list请求列表里头,然后我们异步任务去模拟业务处理,然后将处理结果丢到set结果返回集合中。请求通过future来定时请求返回结果,尝试5次,如果在这5次里头有结果,返回true。如果没有,则返回false。
到此我们代码实现完成了合并请求的逻辑。
合并请求的弊端
通过上面的例子,相信读者也看到有很多处等待重试拿数据的过程,比如说等待结果返回。
这个弊端都凸显出来了,如果数据量很大或者需要处理的任务很多的情况下,大家都在等待,那么服务器的资源一直被消耗,连接也会殆尽。
所以,我们需要有个合理的过期时间,比如说超过多少秒,请求的线程就不再等待直接返回超时,然后有补偿机制再次去查询处理结果。