场景回溯
本人所在的项目是一个支付项目,有个场景就是当用户下单之后,需要及时的知道订单的支付状态,有的渠道回调比较慢,故在用户下单之后将订单信息放入redis,然后不断的去轮询调用渠道方订单查询接口。
问题复现
原始版本
不断的从redis中消费数据,然后调用渠道方订单查询接口查询订单状态,如果返回的状态是未支付的话,则会重新放到redis中,等待下一次在进行查询。每个订单在2分钟内可能会调用渠道方接口查询很多次,对渠道方的接口压力比较大。订单量小的时候还好,订单量大的时候,渠道方时常过来骂人,然后把我们的ip拉入黑名单,被怼了几次之后,我们妥协了,决定优化下相关的代码。
1.0版本
有鉴于调用渠道方的订单查询接口太频繁了,所以我们做一个优化,比如A订单,第一次查询的状态是未支付的话,则会将该订单放入map中做一个标记,同时会重新放回redis中,下次从redis中消费数据时,会首先判断下这个订单在map中有没有,如果有的话则会判断其放入到map中的时间是否超过5秒钟,如果没有的话,则会重新放入redis中。等待下一次轮询。经过一轮代码的Review,大家都一致认为没啥大问题,交个测试测的话也没有啥大问题。就这么愉快的上线了。
1.0版本上线之后,线上立刻出现了CPU飙升的情况,飙升的情况太明显了。因为redis本身的吞吐量比较高,在这种情况下,1秒钟内,可能同一笔订单会被put到redis很多次。
1.伪代码如下:
public void run() { Map<String, Object> queryedOrderMap = new HashMap<>(); while (true) { //1.从队列中取出元素 String needCheckStr = redisService.rpop(redisKey); //判断订单放入redis是否超过2分钟 if(订单放入redis超过2分钟){ continue; } //2.判断元素在map中是否存在 if (queryedOrderMap.containsKey(queryPayOrderVO.getOrderNo())) { if (查询的间隔时间不足5秒) { redisService.lpush(redisKey, paramStr); continue; } } //3. 调用通道的接口 boolean result = payApiService.queryPayOrder(queryPayOrderVO); if(!result){ queryedOrderMap.put(queryPayOrderVO.getOrderNo() new Date()); redisService.lpush(redisKey, paramStr); } }
问题就出在了第二步上面了,假设队列里只有一个元素,这个元素在第一次查询的时候是未支付的状态,放入到map中的5秒内,不对的对redis进行操作,这段时间相当于是死循环的状态,导致Redis的被超频繁的读写。如图:CPU的使用情况稳定在40%-50%这个区间内,在繁忙时期Redis被操作了几千次。
2. Redis的监控情况如下:
3. CPU的监控情况如下:
2.0版本
鉴于1.0版本分布之后,线上出现的高CPU的情况,在满足需求的情况下,我们对系统做了紧急优化,采用的优化方案是,不再频繁的操作redis,而是在第一查询的之后,将订单放入延迟队列(DelayQueue)中。由延迟队列来控制调用的时间间隔。然后,相当于分两条线,第一条线是从redis消费数据,第二条线是从延迟队列中消费数据。这样可以大大减轻对redis的压力。这里选中延迟队列的作用是,延迟队列DelayQueue可以设置元素的过期时间, 如果元素没有达到过期时间则取出来为空,还会把最先过期的元素放在队列的头部。
伪代码如下:
1.定义延迟队列的Delayed接口的实现类OrderVo,使用DelayQueue必须实现Delayed的接口,重写compareTo方法和getDelay方法。
public class OrderVo<T> implements Delayed { /** * 到期时间 单位秒 */ private long activeTime; /** * 存储对象 */ private T obj; /** * 设值时的时间 */ private long currentTime; public OrderVo(long activeTime, T obj) { super(); //将传入的时间转换为超时的时刻 this.activeTime = activeTime; this.currentTime = System.currentTimeMillis(); this.obj = obj; } public T getObj() { return obj; } /** * 返回元素的剩余时间 */ @Override public long getDelay(TimeUnit unit) { // 计算剩余的过期时间 // 大于 0 说明未过期 long expireTime = activeTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime); return expireTime; } /** * 按照剩余时间排序 */ @Override public int compareTo(Delayed o) { // 过期时间长的放置在队列尾部 if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) { return 1; } // 过期时间短的放置在队列头 if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { return -1; } return 0; } }
2.从Redis中消费订单的代码。
public void run() { Map<String, Object> queryedOrderMap = new HashMap<>(); while (true) { //1.从队列中取出元素 String needCheckStr = redisService.rpop(redisKey); //判断订单放入redis是否超过2分钟 if(订单放入redis超过2分钟){ continue; } //3. 调用通道的接口 boolean result = payApiService.queryPayOrder(queryPayOrderVO); if(!result){ //放入delay队列 orderDelayQueue.offer(new OrderVo<>(RotationConstants.WAIT_TIME, queryPayOrderVO)); } }
3.从延迟队列中消费数据的代码。
while (true) { OrderVo<QueryPayOrderVO> poll = null; try { //1. 这里不能使用take,后面会说明 poll = orderDelayQueue.take(); } catch (InterruptedException e) { log.error("*************休眠出错={}", e); } boolean result = payApiService.queryPayOrder(queryPayOrderVO); if (!result &&订单放入redis不超过2分钟) { //放入delay队列 OrderVo<QueryPayOrderVO> orderVOOrderVo = new OrderVo<>(RotationConstants.WAIT_TIME, finalPoll.getObj()); orderDelayQueue.offer(orderVOOrderVo); } }
终于改造完了,我兴致冲冲的去进行测试。这不测试不知道,一测试下一条,我批量向Redis中插入750条数据,CPU的最高使用率达到了196%。真的是太疯狂了。
真的太尴尬了。越优化性能越差。我的内心慌得一逼。项目经理和开发组长都催着要上线。这可咋办呢?
这到底是啥原因呢?莫得办法,我只得按照下面的方式把线程的堆栈拉下来看看情况。
第一步: 首先通过top -c 显示进程运行信息列表,按下P,进程按照CPU使用率排序 第二步:根据PID查出消耗CPU最高的线程号,通过执行命令 top -Hp PID,显示一个进程的线程运行信息列表 第三步:使用 printf ‘%x\n’ PID (PID为上一步中获取到的线程号)转换成对应的16进制PID 5c7e 第四步:使用jstack 获取对应的线程信息,jstack 8958 | grep 5c7e 第五步:jstack pid(进程pid)>stack.dump
stack.dump文件拉下来之后,我这边查看了下
好多线程都被DelayQueue的take方法阻塞住了。等待他来释放CPU的操作时间片。然后,在看看take方法的源码。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { //死循环,获取元素 for (;;) { //1.获取头结点元素 E first = q.peek(); if (first == null) //2.获取不到则等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); //元素的剩余时间小于等于0则返回元素 if (delay <= 0) return q.poll(); first = null; //如果元素还未过期,则继续当前线程等待 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { //当前线程等待delay毫秒数 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
这个方法有个最大的问题,就是如果线程A去获取头元素时,首先会进行加锁操作,然后循环去获取元素,头元素未达到过期时间时,他会被wait,知道等待了delay时间,才能被唤醒。然后释放锁。所以其余的线程都被阻塞住了,如果设置的过期时间比较长的话,则会阻塞很长的时间。导致了CPU的使用率维持在一个很高的水平。
所以,这里推荐使用poll方法来获取元素:
poll的方法源码如下:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
这里poll方法,获取头结点元素,如果为空,或者元素的时间未达到过期时间,则返回为空,否则则正常的返回元素。阻塞的时间很短。
然后,我们将使用take方法的地方替换成poll方法。代码如下所示:
try { poll = orderDelayQueue.poll(); //头元素未过期,睡眠200毫秒 if (poll == null) { Thread.sleep(200); continue; } } catch (InterruptedException e) { log.error("*************休眠出错={}", e); }
经过这么一优化,CPU的使用率果然下降下来了。效果如下:
发布上线之后效果更佳,发布之后,CPU的使用率立马就降下来。
最终的流程图如下图所示:
总结复盘
经过这次问题,让我认识到,还是要敬畏每一行代码,每次代码优化都要进行充足的测试。就像1.0版本里,无脑的给redis队列中push元素,而不考虑redis读写很快,会频繁的操作redis。同样的在2.0版本里,没有经过太多思考的使用take方法。同样会造成潜在的问题。
碰到一个问题之后,多想想,多问问,多多测试。才能有效的避免潜在的坑。