阿里二面:RocketMQ 消息积压了,增加消费者有用吗?

简介: 阿里二面:RocketMQ 消息积压了,增加消费者有用吗?

大家好,我是君哥。今天分享一道有意思的面试题。

面试官:RocketMQ 消息积压了,增加消费者有用吗?

:这个要看具体的场景,不同的场景下情况是不一样的。

面试官:可以详细说一下吗?

:如果消费者的数量小于 MessageQueue 的数量,增加消费者可以加快消息消费速度,减少消息积压。比如一个 Topic 有 4 个 MessageQueue,2 个消费者进行消费,如果增加一个消费者,明细可以加快拉取消息的频率。如下图:

微信图片_20221213111121.png

如果消费者的数量大于等于 MessageQueue 的数量,增加消费者是没有用的。比如一个 Topic 有 4 个 MessageQueue,并且有 4 个消费者进行消费。如下图

微信图片_20221213111147.png

面试官:你说的第一种情况,增加消费者一定能加快消息消费的速度吗?

:这...,一般情况下是可以的。

面试官:有特殊的情况吗?

:当然有。消费者消息拉取的速度也取决于本地消息的消费速度,如果本地消息消费的慢,就会延迟一段时间后再去拉取。

面试官:在什么情况下消费者会延迟一段时间后后再去拉取呢?

:消费者拉取的消息存在 ProcessQueue,消费者是有流量控制的,如果出现下面三种情况,就不会主动去拉取:

  • ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置);
  • ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置);
  • 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。

这部分源码请参考类:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl。

面试官:还有其他情况吗?

:对于顺序消费的场景,ProcessQueue 加锁失败,也会延迟拉取,这个延迟时间是 3s。

面试官:消费者延迟拉取消息,一般可能是什么原因导致的呢?

:其实延迟拉取的本质就是消费者消费慢,导致下次去拉取的时候 ProcessQueue 中积压的消息超过阈值。以下面这张架构图为例:

微信图片_20221213111224.png

消费者消费慢,可是能下面的原因:

  • 消费者处理的业务逻辑复杂,耗时很长;
  • 消费者有慢查询,或者数据库负载高导致响应慢;
  • 缓存等中间件响应慢,比如 Redis 响应慢;
  • 调用外部服务接口响应慢。

面试官:对于外部接口响应慢的情况,有什么应对措施吗?

:这个要分情况讨论。

如果调用外部系统只是一个通知,或者调用外部接口的结果并不处理,可以采用异步的方式,异步逻辑里采用重试的方式保证接口调成功。

如果外部接口返回结果必须要处理,可以考虑接口返回的结果是否可以缓存默认值(要考虑业务可行),在调用失败后采用快速降级的方式,使用默认值替代返回接口返回值。

如果这个接口返回结果必须要处理,并且不能缓存,可以把拉取到的消息存入本地然后给 Broker 直接返回 CONSUME_SUCCESS。等外部系统恢复正常后再从本地取出来进行处理。

面试官:如果消费者数小于 MessageQueue 数量,并且外部系统响应正常,为了快速消费积压消息而增加消费者,有什么需要考虑的吗?

:外部系统虽然响应正常,但是增加多个消费者后,外部系统的接口调用量会突增,如果达到吞吐量上限,外部系统会响应变慢,甚至被打挂。

同时也要考虑本地数据库、缓存的压力,如果数据库响应变慢,处理消息的速度就会变慢,起不到缓解消息积压的作用。

面试官:新增加了消费者后,怎么给它分配 MessageQueue 呢?

:Consumer 在拉取消息之前,需要对 MessageQueue 进行负载操作。RocketMQ 使用一个定时器来完成负载操作,默认每间隔 20s 重新负载一次。

面试官:能详细说一下都有哪些负载策略吗?

:RocketMQ 提供了 6 种负载策略,依次来看一下。

平均负载策略

  1. 把消费者进行排序;
  2. 计算每个消费者可以平均分配的 MessageQueue 数量;
  3. 如果消费者数量大于 MessageQueue 数量,多出的消费者就分不到;
  4. 如果不可以平分,就使用 MessageQueue 总数量对消费者数量求余数 mod;
  5. 对前 mod 数量消费者,每个消费者加一个,这样就获取到了每个消费者分配的 MessageQueue 数量。

比如 4 个 MessageQueue 和 3 个消费者的情况:

微信图片_20221213111254.png

源代码的逻辑非常简单,如下:

// AllocateMessageQueueAveragely 这个类
// 4 个 MessageQueue 和 3 个消费者的情况,假如第一个,index = 0
int index = cidAll.indexOf(currentCID);
// mod = 1
int mod = mqAll.size() % cidAll.size();
// averageSize = 2
int averageSize =
    mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                         + 1 : mqAll.size() / cidAll.size());
// startIndex = 0
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// range = 2,所以第一个消费者分配到了2个
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

循环分配策略:

这个很容易理解,遍历消费者,把 MessageQueue 分一个给遍历到的消费者,如果 MessageQueue 数量比消费者多,需要进行多次遍历,遍历次数等于 (MessageQueue 数量/消费者数量),还是以 4 个 MessageQueue 和 3 个消费者的情况,如下图:

微信图片_20221213111317.png

源代码如下:

//AllocateMessageQueueAveragelyByCircle 这个类
//4 个 MessageQueue 和 3 个消费者的情况,假如第一个,index = 0
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
    if (i % cidAll.size() == index) {
        //i == 0 或者 i == 3 都会走到这里
        result.add(mqAll.get(i));
    }
}

自定义分配策略

这种策略在消费者启动的时候可以指定消费哪些 MessageQueue。可以参考下面代码:

AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
//绑定消费 messageQueue1
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1","broker1",0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();

按照机房分配策略

这种方式 Consumer 只消费指定机房的 MessageQueue,如下图:Consumer0、Consumer1、Consumer2 绑定 room1 和 room2 这两个机房,而 room3 这个机房没有消费者。

微信图片_20221213111341.png

Consumer 启动的时候需要绑定机房名称。可以参考下面代码:

AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
//绑定消费 room1 和 room2 这两个机房
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1","room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();

这种策略 broker 的命名必须按照格式:机房名@brokerName,因为消费者分配队列的时候,首先按照机房名称过滤出所有的 MessageQueue,然后再按照平均分配策略进行分配

//AllocateMessageQueueByMachineRoom 这个类
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
    String[] temp = mq.getBrokerName().split("@");
    if (temp.length == 2 && consumeridcs.contains(temp[0])) {
        premqAll.add(mq);
    }
}
//上面按照机房名称过滤出所有的 MessageQueue 放入premqAll,后面就是平均分配策略

按照机房就近分配

跟按照机房分配原则相比,就近分配的好处是可以对没有消费者的机房进行分配。如下图,机房 3 的 MessageQueue 也分配到了消费者:

微信图片_20221213111406.png

如果一个机房没有消费者,则会把这个机房的 MessageQueue 分配给集群中所有的消费者。

源码所在类:AllocateMachineRoomNearby。

一致性 Hash 算法策略

把所有的消费者经过 Hash 计算分布到 Hash 环上,对所有的 MessageQueue  进行 Hash  计算,找到顺时针方向最近的消费者节点进行绑定。如下图:

微信图片_20221213111432.png

源代码如下:

//所在类 AllocateMessageQueueConsistentHash
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
    cidNodes.add(new ClientNode(cid));
}
//使用消费者构建 Hash 环,把消费者分布在 Hash 环节点上
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
    router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
    router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
//对 MessageQueue 做 Hash 运算,找到环上距离最近的消费者
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
    ClientNode clientNode = router.routeNode(mq.toString());
    if (clientNode != null && currentCID.equals(clientNode.getKey())) {
        results.add(mq);
    }
}

面试官:恭喜你,通过了。

       -------------end---------------

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
物联网
MQTT常见问题之用单片机接入阿里MQTT实例失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
消息中间件 存储 监控
|
29天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
3月前
|
消息中间件 人工智能 监控
|
5月前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
166 1
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
5月前
|
消息中间件 存储 监控
【消息中间件】详解mq消息积压
【消息中间件】详解mq消息积压
199 0
|
消息中间件 中间件 Kafka
限时开源!阿里内部消息中间件合集:MQ+Kafka+体系图+笔记
近好多小伙伴说在准备金三银四的面试突击了,但是遇到消息中间件不知道该怎么学了,问我有没有成体系的消息中间件的学习方式。 额,有点不知所措,于是乎小编就想着做一次消息中间件的专题,归类整理了一些纯手绘知识体系图、面试以及相关的学习笔记。
235 1
|
消息中间件 程序员 Apache
阿里RocketMQ创始人首次分享出这份RocketMQ技术内幕神级架构手册
RocketMQ的发展史? RocketMQ的开源正是源于对这种开源文化的认同,开放是为了更好的协同创新,并将这一技术推向新的高度。在经历了阿里巴巴集团内部多年“双11”交易核心链路工业级场景在验证,2016年11月,团队将RocketMQ捐献给全球享有盛誉的Apache软件基金会正式质为孵化项目。 至此,RocketMQ开启了迈向全球顶级开源软件的新征程。
|
消息中间件 RocketMQ
RocketMQ消费者没有成功消费消息的问题排查
RocketMQ消费者没有成功消费消息的问题排查
1567 1