开发者社区 > 云原生 > 云消息队列 > 正文

如何解决消息队列的延时以及过期失效问题?【Java问答学堂】24期

面试题

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

面试官心理分析

你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了;或者消费的速度极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?

所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了;或者是消费端出了个什么岔子,导致消费速度极其慢。

面试题剖析

关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。

大量消息在 mq 里积压了几个小时了还没解决 几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

mq 中的消息过期失效了 假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。

这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq 都快写满了 如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

往期回顾:

【Java问答学堂】1期 为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?

【Java问答学堂】2期 如何保证消息队列的高可用?

【Java问答学堂】3期 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

【Java问答学堂】4期 如何保证消息的可靠性传输?(如何处理消息丢失的问题?)

【Java问答学堂】5期 如何保证消息的顺序性?

【Java问答学堂】6期 如何解决消息队列的延时以及过期失效问题?

【Java问答学堂】7期 如果让你写一个消息队列,该如何进行架构设计?

【Java问答学堂】8期 es 的分布式架构原理能说一下么(es 是如何实现分布式的啊)?

【Java问答学堂】9期 es 写入数据的工作原理是什么啊?es 查询数据的工作原理是什么啊?

【Java问答学堂】10期 es 在数据量很大的情况下(数十亿级别)如何提高查询效率啊?

【Java问答学堂】11期 es 生产集群的部署架构是什么?每个索引的数据量大概有多少?

【Java问答学堂】12期 项目中缓存是如何使用的?为什么要用缓存?缓存使用不当会造成什么后果?

【Java问答学堂】13期 redis 和 memcached 有什么区别?

【Java问答学堂】14期 redis 都有哪些数据类型?分别在哪些场景下使用比较合适?

【Java问答学堂】15期redis 的过期策略都有哪些?内存淘汰机制都有哪些?

【Java问答学堂】16期如何保证 redis 的高并发和高可用?redis 的主从复制原理能介绍

为什么使用消息队列?【Java问答学堂】17期

消息队列有什么优点和缺点?【Java问答学堂】18期

Kafka、ActiveMQ、RabbitMQ、RocketMQ的区别?【Java问答学堂】19期

如何保证消息队列的高可用?【Java问答学堂】20期

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?【Java问答学堂】21期

如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?【Java问答学堂】22期

如何保证消息的顺序性?【Java问答学堂】23期

展开
收起
请回答1024 2020-05-22 19:09:10 2327 0
1 条回答
写回答
取消 提交回答
  • 收藏

    2020-05-22 19:46:20
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

热门讨论

热门文章

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
JAVA开发手册1.5.0 立即下载