RocketMQ消息发送的高可用设计

简介: 从rocketmq topic的创建机制可知,一个topic对应有多个消息队列,那么我们在发送消息时,是如何选择消息队列进行发送的?假如这时有broker宕机了,rocketmq是如何规避故障broker的?看完这篇文章,相信你会从文中找到答案。

从rocketmq topic的创建机制可知,一个topic对应有多个消息队列,那么我们在发送消息时,是如何选择消息队列进行发送的?假如这时有broker宕机了,rocketmq是如何规避故障broker的?看完这篇文章,相信你会从文中找到答案。


rocketmq在发送消息时,由于nameserver检测broker是否还存活是有延迟的,在选择消息队列时难免会遇到已经宕机的broker,又或者因为网络原因发送失败的,因此rocketmq采取了一些高可用设计的方案,主要通过两个手段:重试与Broker规避。


重试机制



直接定位到client端发送消息的方法:


org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:

1int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
1for (; times < timesTotal; times++) {
2    // ...
3}


在client端,发送消息的方式有:同步(SYNC)、异步(ASYNC)、单向(ONEWAY)。


那么可以知道,retryTimesWhenSendFailed决定同步方法重试次数,默认重试次数为3次。


重试机制提高了消息发送的成功率。


选择队列的方式



我在这里提出一个问题:


现在有个由两个broker节点组成的集群,有topic1,默认在每个broker上创建4个队列,分别是:master-a(q0,q1,q2,q3)、master-b(q0,q1,q2,q3),上一次发送消息到master-a的q0队列,此时master-a宕机了,如果继续发送topic1消息,rocketmq如果避免再次发送到master-a?


以上问题引出了rocketmq发送消息时如何选择队列的一些机制,选择队列有两种方式,通过sendLatencyFaultEnable的值来控制,默认值为false,不启动broker故障延迟机制,值为true时启用broker故障延迟机制。


1. 默认机制


sendLatencyFaultEnable=false,消息发送选择队列调用以下方法:


org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue:

1public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
 2  if (lastBrokerName == null) {
 3    return selectOneMessageQueue();
 4  } else {
 5    int index = this.sendWhichQueue.getAndIncrement();
 6    for (int i = 0; i < this.messageQueueList.size(); i++) {
 7      int pos = Math.abs(index++) % this.messageQueueList.size();
 8      if (pos < 0)
 9        pos = 0;
10      MessageQueue mq = this.messageQueueList.get(pos);
11      if (!mq.getBrokerName().equals(lastBrokerName)) {
12        return mq;
13      }
14    }
15    return selectOneMessageQueue();
16  }
17}


这里的lastBrokerName指的是上一次执行消息发送时选择失败的broker,在重试机制下,第一次执行消息发送时,lastBrokerName=null,直接选择以下方法:


org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue:

1public MessageQueue selectOneMessageQueue() {
2  int index = this.sendWhichQueue.getAndIncrement();
3  int pos = Math.abs(index) % this.messageQueueList.size();
4  if (pos < 0)
5    pos = 0;
6  return this.messageQueueList.get(pos);
7}


sendWhichQueue是一个利用ThreadLocal本地线程存储自增值的一个类,自增值第一次使用Random类随机取值,此后如果消息发送出发重试机制,那么每次自增取值。


此方法直接用sendWhichQueue自增获取值,再与消息队列的长度进行取模运算,取模目的是为了循环选择消息队列。


如果此时选择的队列发送消息失败了,此时重试机制在再次选择队列时lastBrokerName不为空,回到最开始的那个方法,还是利用sendWhichQueue自增获取值,但这里多了一个步骤,与消息队列的长度进行取模运算后,如果此时选择的队列所在的broker还是上一次选择失败的broker,则继续选择下一个队列。


我们再细想一下,如果此时有broker宕机了,在默认机制下很可能下一次选择的队列还是在已经宕机的broker,没有办法规避故障的broker,因此消息发送很可能会再次失败,重试发送造成了不必要的性能损失。


所以rocketmq采用Broker故障延迟机制来规避故障的broker。


2. Broker故障延迟机制


sendLatencyFaultEnable=true,消息发送选择队列调用以下方法:


org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue:

1public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 2  // Broker故障延迟机制
 3  if (this.sendLatencyFaultEnable) {
 4    try {
 5      // 自增取值
 6      int index = tpInfo.getSendWhichQueue().getAndIncrement();
 7      for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 8        // 队列位置值取模
 9        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
10        if (pos < 0)
11          pos = 0;
12        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
13        // 校验队列是否可用
14        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
15          if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
16            return mq;
17        }
18      }
19
20      // 尝试从失败的broker列表中选择一个可用的broker
21      final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
22      int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
23      if (writeQueueNums > 0) {
24        final MessageQueue mq = tpInfo.selectOneMessageQueue();
25        if (notBestBroker != null) {
26          mq.setBrokerName(notBestBroker);
27          mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
28        }
29        return mq;
30      } else {
31        // 从失败条目中移除已经恢复的broker
32        latencyFaultTolerance.remove(notBestBroker);
33      }
34    } catch (Exception e) {
35      log.error("Error occurred when selecting message queue", e);
36    }
37
38    return tpInfo.selectOneMessageQueue();
39  }
40
41  // 默认机制
42  return tpInfo.selectOneMessageQueue(lastBrokerName);
43}


该方法利用sendWhichQueue的自增取值的方式轮询选择队列,与默认机制一致,不同的是多了判断是否可用,调用了:


latencyFaultTolerance.isAvailable(mq.getBrokerName());

来判断,其中肯定内涵机关,所以我们需要从延迟机制的几个核心类找突破口。


下面我会从源码的角度详细地分析rocketmq是如何实现在一定时间内规避故障broker的,从发送消息方法源码看出,在发送完消息,会调用updateFaultItem方法:


org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:

1// 3.执行真正的消息发送
2sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
3endTimestamp = System.currentTimeMillis();
4this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);


发送消息时捕捉到异常同样会调用updateFaultItem方法,它是延迟机制的核心方法:

1endTimestamp = System.currentTimeMillis();
2this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);


其中endTimestamp - beginTimestampPrev等于消息发送需要用到的时间,如果成功发送第三个参数传的是false,发送失败传true,下面继续看updateFaultItem方法的实现源码:


org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem:

1public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
2  if (this.sendLatencyFaultEnable) {
3    long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
4    this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
5  }
6}
1private long computeNotAvailableDuration(final long currentLatency) {
2  for (int i = latencyMax.length - 1; i >= 0; i--) {
3    if (currentLatency >= latencyMax[i])
4      return this.notAvailableDuration[i];
5  }
6  return 0;
7}


其中参数currentLatency为本次消息发送的延迟时间,isolation表示broker是否需要规避,所以消息成功发送表示broker无需规避,消息发送失败时表示broker发生故障了需要规避。


latencyMax和notAvailableDuration是延迟机制算法的核心值,每次发送消息的延迟,它们也决定了失败条目中的startTimestamp的值。


从方法可看出,如果broker需要隔离,消息发送延迟时间默认为30s,再利用这个时间从latencyMax尾部向前找到比currentLatency小的数组下标index,如果没有找到就返回0,我们看看latencyMax和notAvailableDuration这两个数组的默认值:


1private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
2private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};


可看出,如果isolation=true,该broker会得到一个10分钟规避时长,如果isolation=false,那么规避时长就得看消息发送的延迟时间是多少了,我们继续往下撸:

org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem:

1public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
 2  // 从缓存中获取失败条目
 3  FaultItem old = this.faultItemTable.get(name);
 4  if (null == old) {
 5    // 如果缓存不存在,新建失败条目
 6    final FaultItem faultItem = new FaultItem(name);
 7    faultItem.setCurrentLatency(currentLatency);
 8    // broker开始可用时间=当前时间+规避时长
 9    faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
10
11    old = this.faultItemTable.putIfAbsent(name, faultItem);
12    if (old != null) {
13      // 更新旧的失败条目
14      old.setCurrentLatency(currentLatency);
15      old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
16    }
17  } else {
18    // 更新旧的失败条目
19    old.setCurrentLatency(currentLatency);
20    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
21  }
22}


FaultItem为存储故障broker的类,称为失败条目,每个条目存储了broker的名称、消息发送延迟时长、故障规避开始时间。


该方法主要是对失败条目的一些更新操作,如果失败条目已存在,那么更新失败条目,如果失败条目不存在,那么新建失败条目,其中失败条目的startTimestamp为当前系统时间加上规避时长,startTimestamp是判断broker是否可用的时间值:


org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable:

1public boolean isAvailable() {
2  return (System.currentTimeMillis() - startTimestamp) >= 0;
3}


如果当前系统时间大于故障规避开始时间,说明broker可以继续加入轮询的队伍里了。


写在最后



经过一波源码的分析,我相信你已经找到了你想要的答案,这个故障延迟机制真的是一个很好的设计,我们看源码不仅仅要看爽,爽了之后我们还要思考一下这些优秀的设计思想在平时写代码的过程中是否可以借鉴一下?

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
369 1
|
消息中间件 存储 Kafka
如何保证MQ消息队列的高可用?
如何保证MQ消息队列的高可用?
270 0
|
3月前
|
消息中间件 存储 算法
一文详解 RocketMQ 如何利用 Raft 进行高可用保障
本文介绍 RocketMQ 如何利用 Raft(一种简单有效的分布式一致性算法)进行高可用的保障,总结了 RocketMQ 与 Raft 的前世今生。可以说 Raft 的设计给 RocketMQ 的高可用注入了非常多的养分,RocketMQ 的共识算法与高可用设计在 2023 年也得到了学术界的认可,被 CCF-A 类学术会议 ASE 23' 录用。
434 11
|
6月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1337 3
|
6月前
|
消息中间件 运维 应用服务中间件
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
907 1
|
消息中间件 Java Spring
搭建高可用rabbitmq集群及spring boot实现集群配置
搭建高可用rabbitmq集群及spring boot实现集群配置
229 0
|
存储 消息中间件 缓存
一文读懂RocketMQ的高可用机制——消息存储高可用
一文读懂RocketMQ的高可用机制——消息存储高可用
1128 1
|
消息中间件 存储 缓存
一文读懂RocketMQ的高可用机制——集群管理高可用
一文读懂RocketMQ的高可用机制——集群管理高可用
2806 1
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息消费高可用
一文读懂RocketMQ的高可用机制——消息消费高可用
880 1
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
78 0