阿里二面:RocketMQ 集群 Broker 挂了,会造成什么影响?

简介: 阿里二面:RocketMQ 集群 Broker 挂了,会造成什么影响?

大家好,我是君哥。今天分享 RocketMQ 的 Broker 挂了,会带来什么影响。

面试官:你好,如果 RocketMQ 集群中的一个 Broker 挂了,会造成什么影响呢?

:Broker 挂了,首先会导致 Producer 发送消息失败。对于普通消息,Producer 同步发送的情况下会有重试机制,重试时把消息发送到其他 Broker。如下图,Broker1 宕机了,把消息发送到了 Broker2:

微信图片_20221213121231.png

发送消息的逻辑其实是是一个循环,发送失败后会不断尝试重新发送,代码如下:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
 String lastBrokerName = null == mq ? null : mq.getBrokerName();
 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
 if (mqSelected != null) {
  mq = mqSelected;
  try {
   sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
   switch (communicationMode) {
    case ASYNC:
     return null;
    case ONEWAY:
     return null;
    case SYNC:
     if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
         //如果发送失败了,这里会进行重试
      if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
       continue;
      }
     }
     return sendResult;
    default:
     break;
   }
  } catch (RemotingException e) {
  }//省略其他 catch
 } else {
  break;
 }                                                                                                                                                                                                                                                                                    
}

对于单边消息是不会重试的,因此对于单边消息,就只能发送失败了。而对于同步消息和异步消息,可以通过重试的方式发送到其他的 Broker 上。

面试官:在同步的情况下,Producer 重试时怎么保证不把消息发送到挂掉的 Broker 上呢?

:Producer 默认采用 round-robin 的方式,重试前会记录上一次发送消息的 Broker,然后选择下一个 Broker。代码如下:

//lastBrokerName 记录了上一次发送的 Broker Name
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
 if (lastBrokerName == null) {
  return selectOneMessageQueue();
 } else {
  for (int i = 0; i < this.messageQueueList.size(); i++) {
   int index = this.sendWhichQueue.incrementAndGet();
   int pos = Math.abs(index) % this.messageQueueList.size();
   if (pos < 0)
    pos = 0;
   MessageQueue mq = this.messageQueueList.get(pos);
   //Broker Name 不等于上次的,才会返回
   if (!mq.getBrokerName().equals(lastBrokerName)) {
    return mq;
   }
  }
  return selectOneMessageQueue();
 }
}

面试官:在大流量的场景下,可能会有大量消费发送到失败的 Broker,这样导致大量的消息需要重试,对性能影响会很大,有什么解决方法吗?

:RocketMQ 有延迟隔离策略,如果发送某一个 Broker 失败了,会将其隔离,优先选择正常的 Broker 发送消息。需要注意的是,这个策略默认是不开启的。

面试官:怎么开启延迟隔离策略呢?

:需要在初始化 Producer 的时候定义,见下面代码第二行:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setSendLatencyFaultEnable(true);
producer.start();

开启之后,发送消息时会记录发送消息花费的时间下面 latencyMax 变量,超过一定时间,这个 Broker 就会在一段时间内不允许发送(下面 notAvailableDuration 变量)。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

具体逻辑可以参考类 MQFaultStrategy。

面试官刚刚聊的是对普通消息的影响,那对顺序消息有什么影响呢?

:对于全局顺序消息,如果设置了所有消息要发送到同一个 Broker 的同一个 MessageQueue 中的情况,恰好是这个 Broker 挂了,那就只能等 Broker 重启后再发送了。而对于局部顺序消息,比如同一个订单相关的消息要发送到同一个 Broker 的同一个 MessageQueue 中的情况,如果这个 Broker 挂了,那 MessageQueueSelector 会选择其他 Broker 上的 MessageQueue 进行发送,这会影响当前这笔订单消费的顺序性。而其他订单可以被 Producer 发送到其他的队列中,不受影响。如下图:

微信图片_20221213121300.png

Broker1 挂之前,Order1 的消息发送到了 Broker1,Broker1 挂之后,Order1 的消息被发送到了 Broker2。在 Broker1 恢复前,消费者只能消费 Broker2 上拉取 Order1 的消息,Broker1 恢复后消费者线程再从 Broker1 拉取,因此 Order1 的消息产生乱序。这里假设没有从节点

面试官:Broker 挂了,对 消费者有影响吗?

:如果 Broker 没有设置主从集群,消费会继续从挂掉的 Broker 上拉取,这会导致拉取失败,直到 NameServer 更新了 Broker 列表。

面试官:NameServer 什么时候会更新 Broker 列表呢?

:NameServer 会有每 10s 一次的定时任务检查 Broker 是否下线了,如果 120s 内有没有收到 Broker 心跳,则关闭 channel,把 Broker 信息从本地缓存移除。消费者则默认每隔 30s 向 NameServer 拉取路由信息来刷新本地缓存的 Broker 列表。也就是说可能会有最多 150s 的时间消费者拉取消息失败。如下图:

微信图片_20221213121323.png

面试官:如果 Broker 集群配置了从节点,还会有上面的影响吗?

:如果有从节点,在 Broker 主节点恢复前,生产者是不能往从节点发送消息的,但是消费者可以去从节点拉取消息。

面试官:消费者什么时候会去 Broker 从节点拉取消息呢?

:Broker 挂了以后,消费组会通过向 Name Server 拉取订阅关系来更新本地缓存的 Broker 列表,因为主节点已经不在列表中了,所以会从从节点列表中选择一个 Broker 进项消息拉取。

面试官:如果主节点没有挂,消费者会去从节点拉取消息吗?

:在主节点系统压力较大的时候,消费者也会去从节点拉取消息。可以参考下面的代码:

//DefaultMessageStore 类
//maxOffsetPy:最大物理偏移量
//maxPhyOffsetPulling:这次消息拉取的最大偏移量
//diff:还没有被拉取的消息总长度
long diff = maxOffsetPy - maxPhyOffsetPulling;
//TOTAL_PHYSICAL_MEMORY_SIZE:系统总的物理内存大小
//getAccessMessageInMemoryMaxRatio 默认是 40
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
 * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);

从上面的代码可以看出,当未处理的消息超出物理内存 40% 时就会去从节点拉取。需要注意两点:

  1. 需要设置 slaveReadEnable 参数为 true,才能去从节点读取数据;
  2. 需要配置 whichBrokerWhenConsumeSlowly 参数来决定从哪个从 brokerId 读取。参考下面这段代码:
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
 // consume too slow ,redirect to another machine
 if (getMessageResult.isSuggestPullingFromSlave()) {
     //这里配置从哪个从节点拉取
  responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
 }
 //...
}
  1. brokerId 默认是 0,也就是主节点,如果主节点挂了并且长期启动失败,这个参数也是需要改成可以长期拉取的一个从节点。

面试官:Broker 主节点挂了,如果成功从节点拉取消息,可能会重复消费吗?

:对于广播模式,消息偏移量是保存在消费者本地的,只要消费者不挂,按照内存中的偏移量去从节点拉取就行了,不会有问题。对于集群模式,消息偏移量保存在 Broker,路径如下:

/${rocketmq.client.localOffsetStoreDir}/.rocketmq_offsets/${clientId}/${groupName}/offsets.json

消费者消费完一批消息后,会向 Broker 发送请求更新 Broker 内存中保存的偏移量,内存中的偏移量会定时(每 5s 一次)更新到上面文件中。如果 Broker 主节点不挂,无论消费者从主节点还是从节点拉取消息,更新偏移量的请求都会发送到主节点,从节点会每隔 10s 从主节点同步偏移量,如下图:

微信图片_20221213121348.png

代码如下:

//BrokerController 类 handleSlaveSynchronize
if (role == BrokerRole.SLAVE) {
 slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
   try {
    BrokerController.this.slaveSynchronize.syncAll();
   }
  }
 }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
}

也就是说,如果主节点挂了,去从节点拉取消息,可能因为偏移量没有同步到主节点,从节点保存的偏移量不正确。不过只要消费者不宕机,就会根据消费者本地保存的偏移量去拉取,并不会拉取到重复消息。

面试官:如果 Broker 主节点重启了,主节点并不能同步从节点的最新偏移量,那消费者从主节点读取会读到重复消息吗?

:如果主节点重启了,如果消费者会用本地保存的偏移量去主节点拉取消息,主节点会更新本地的偏移量,同时从节点也会去主节点同步偏移量,所以并不会拉取到重复消息。如果消费者也挂了,消费者重启后 Broker 主节点的偏移量还没有被其他消费者更新过,那确实会拉取到重复消息。

面试官:恭喜你,通过了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
5月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
4月前
|
消息中间件 存储 负载均衡
|
4月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
53 2
|
4月前
|
消息中间件 人工智能 监控
|
4月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
95 0
|
4月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
246 0
|
5月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决