一、前言
在上一篇我们介绍了 一文读懂RocketMQ的高可用机制——消息存储高可用,这一篇我们来说一下消息发送是如何保证高可用的。
我们从前面的几篇文章也了解到,RocketMQ 的消息发送机制是通过 NameServer 监听 Broker 集群的心跳,Producer 从 NameServer 中获取 Broker、Topic 以及 ConsumerQueue 等相关信息,然后 Producer 把指定的 Topic 信息发送到指定的 ConsumerQueue 里去。有点抽象是吧,没关系,先来看一张架构图。
看完整体结构,我们再来看下消息发送局部关联的结构图。
相信大家看完这两张结构图,对 RocketMQ 的消息发送机制有个清晰的脉络了。但你可能会问,老周,那 RocketMQ 消息发送高可用机制在哪里体现的呢?不着急,我们往下看。
既然说的是消息发送高可用,那我们就先来看 Producer 发送消息到 Broker 机器过程中可能遇到的问题。比如,网络问题、Broker 宕机等。
而 NameServer 检测 Broker 是有延迟的,我们都知道 NameServer 每隔 10 秒会扫描所有 Broker 信息,但要 Broker 的最后心跳时间超过 120 秒以上才认为 Broker 不可用,所以 Producer 不能及时感知 Broker 下线。如果在这期间消息一直发送失败,那么消息发送失败率会很高,这在业务上是无法接收的。这里大家可能会有一个疑问,为什么 NameServer 不及时检查 Broker 和通知 Producer? 这是因为那样做会使网络通信和架构设计变得非常复杂,而 NameServer 的设计初衷就是尽可能简单,所以这块的高可用方案在 Producer 中来实现。RocketMQ 采用了一些发送端的高可用方案,来解决发送失败的问题,其中最重要的两个设计是重试机制
和故障延迟机制
。
那下面我们就来讲下重试机制和故障延迟机制是如何来保障消息发送高可用的。
二、重试机制
重试机制倒不是什么亮点设计,我们平时写代码的时候也会考虑到,比如调用第三方,第三方接口不通,你还让程序一直调,机器资源就这样白白浪费,所以我们通常会设置重试次数,到了设置的次数还调不通,就走相应的兜底策略或者抛异常。
我们也还是来看下 Producer 的重试机制是怎么处理的。
直接定位到 client 端发送消息的方法:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // 验证消息格式 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 从 nameserver 获取 topic 路由 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 同步方式默认重试 3 次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择需要发送的 queue,这里后面会详细介绍其中负载均衡的逻辑 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 发送消息到rocketmq sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 设置 broker 的可用时间和当前延迟,这个在后面负载均衡和Broker故障延迟机制里面会讲到 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
这里需要说明一点:
如果是网络异常 RemotingException 和客户端异常 MQClientException 会重试,而 Broker 服务端异常 MQBrokerException 和线程中断异常 InterruptedException 则不会再重试,且抛出异常。它这里的粒度就更细了,把相应的异常都涵盖进来了。
在 client 端,发送消息的方式有:同步(SYNC)、异步(ASYNC)、单向(ONEWAY)。
但重试机制仅支持同步发送方式,不支持异步和单向发送方式,默认重试三次。
三、负载均衡机制
Producer 选择 ConsumerQueue 队列的时候默认不启用 Broker 故障延迟机制sendLatencyFaultEnable=false
代码在:
TopicPublishInfo#selectOneMessageQueue(lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 首次选择queue进行发送时,lasterBrokerName为null // 只有当消息发送失败 进行重试时,lastBrokerName才不为null if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 消息发送失败重试时,走到如下逻辑 int index = this.sendWhichQueue.getAndIncrement(); // 遍历消息队列集合 for (int i = 0; i < this.messageQueueList.size(); i++) { // sendWhichQueue自增后取模 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; // 规避上次Broker队列 // 重试时,如果还选择到同一个broker,则可能继续失败,所以加此判断 MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 如果以上情况都不满足,返回sendWhichQueue取模后的队列 return selectOneMessageQueue(); } }
public MessageQueue selectOneMessageQueue() { // sendWhichQueue是一个threadlocal的变量,初始值就是一个随机值,首先随机取一个queue进行消息发送,之后就轮询的进行消息发送 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
sendWhichQueue 是一个利用 ThreadLocal 本地线程存储自增值的一个类,自增值第一次使用 Random类随机取值,此后如果消息发送触发重试机制,那么每次自增取值。
此方法直接用 sendWhichQueue 自增获取值,再与消息队列的长度进行取模运算,取模目的是为了循环选择消息队列。
如果此时选择的队列发送消息失败了,此时重试机制在再次选择队列时 lastBrokerName 不为空,回到最开始的那个方法,还是利用 sendWhichQueue 自增获取值,但这里多了一个步骤,与消息队列的长度进行取模运算后,如果此时选择的队列所在的 broker 还是上一次选择失败的 broker,则继续选择下一个队列。
我们再细想一下,如果此时有 broker 宕机了,在默认机制下很可能下一次选择的队列还是在已经宕机的 broker,没有办法规避故障的 broker,因此消息发送很可能会再次失败,重试发送造成了不必要的性能损失。
所以 rocketmq 采用 Broker 故障延迟机制来规避故障的 broker,下面我们就来看下故障延迟机制是如何来保障消息发送高可用的。
三、故障延迟机制
默认是不开启故障延迟机制的,开启的话,需要设置sendLatencyFaultEnable=true
。上面有说到,当出现异常触发重试机制 sendDefaultImpl 时,会调用 updateFaultItem 方法来设置 broker 的延迟和可用时间,故障延迟机制就是根据这个时间来进行逻辑处理的。
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 判断是否开启故障延迟功能 if (this.sendLatencyFaultEnable) { try { // 自增取值 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 队列位置值取模 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 校验队列是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } // 如果上面的循环选不出message queue,那么则根据每个broker的是否可用,延迟时间,上次消息发送成功时间进行排序,选择一个最优的queue 进行返回 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { // 从失败条目中移除已经恢复的broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 默认机制 return tpInfo.selectOneMessageQueue(lastBrokerName); }
该方法利用 sendWhichQueue 的自增取值的方式轮询选择队列,与默认机制一致,不同的是多了判断是否可用,调用了latencyFaultTolerance.isAvailable(mq.getBrokerName())
判断,既然这样,我们不妨跟一下这个方法。
跟完发现是这样的关系:
所以我们从这几个核心类来入手。
1、延迟机制接口规范
public interface LatencyFaultTolerance<T> { // 更新失败条目 void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); // 判断Broker是否可用 boolean isAvailable(final T name); // 移除Fault条目 void remove(final T name); // 尝试从规避的Broker中选择一个可用的Broker T pickOneAtLeast(); }
2、FaultItem:失败条目
class FaultItem implements Comparable<FaultItem> { // 条目唯一键,这里为brokerName private final String name; // 本次消息发送延迟 private volatile long currentLatency; // 故障规避开始时间 private volatile long startTimestamp; ... }
3、消息失败策略
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); // 这就是故障延迟机制selectOneMessageQueue里默认的false private boolean sendLatencyFaultEnable = false; // 根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // 根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; ... }
4、原理分析
从发送消息方法源码看出,在发送完消息,会调用 updateFaultItem 方法,代码在:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
// 执行真正的消息发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
如果上述发送过程出现异常,则调用 DefaultMQProducerImpl#updateFaultItem
,这个是延迟机制的核心方法!
endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 参数一:broker名称 // 参数二:本次消息发送延迟时间 // 参数三:是否隔离 this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
其中endTimestamp - beginTimestampPrev
等于消息发送需要用到的时间,如果成功发送第三个参数传的是false,发送失败传 true,下面继续看 updateFaultItem 方法的实现源码:
org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 计算broker规避的时长 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); // 更新该FaultItem规避时长 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
private long computeNotAvailableDuration(final long currentLatency) { // 遍历latencyMax for (int i = latencyMax.length - 1; i >= 0; i--) { // 找到第一个比currentLatency小的latencyMax值 if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } // 没有找到则返回0 return 0; }
其中参数 currentLatency 为本次消息发送的延迟时间,isolation 表示 broker 是否需要规避,所以消息成功发送表示 broker 无需规避,消息发送失败时表示 broker 发生故障了需要规避。
latencyMax 和 notAvailableDuration 是延迟机制算法的核心值,每次发送消息的延迟,它们也决定了失败条目中的 startTimestamp 的值。
从方法可看出,如果 broker 需要隔离,消息发送延迟时间默认为 30 s,再利用这个时间从 latencyMax 尾部向前找到比 currentLatency 小的数组下标 index,如果没有找到就返回 0,我们看看 latencyMax 和 notAvailableDuration 这两个数组的默认值:
// 根据currentLatency本地消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引,如果没有找到,返回0 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // 根据这个索引从notAvailableDuration取出对应的时间,在该时长内,Broker设置为不可用 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
可看出,如果 isolation=true,该 broker 会得到一个 0 秒的规避时长,如果 isolation=false,那么规避时长就得看消息发送的延迟时间是多少了,我们继续往下看:
public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; }
FaultItem 为存储故障 broker 的类,称为失败条目,每个条目存储了 broker 的名称、消息发送延迟时长、故障规避开始时间。
该方法主要是对失败条目的一些更新操作,如果失败条目已存在,那么更新失败条目,如果失败条目不存在,那么新建失败条目,其中失败条目的 startTimestamp 为当前系统时间加上规避时长,startTimestamp 是判断 broker 是否可用的时间值。
public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; }
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
如果当前系统时间大于故障规避开始时间,说明 broker 可以继续加入轮询的队伍里了。
四、总结
本来不想源码分析的,但一路分析下来,想要理解透彻消息发送高可用机制,还是得到源码里探本溯源。相信大家从头看下来,收获到了自己想要的答案。最后老周稍微总结一下消息发送高可用的机制。
重试机制,在消息发送出现异常时会尝试再次发送,默认重试三次。重试机制仅支持同步发送方式,不支持异步和单向发送方式,根据发送失败的异常类型处理策略略有不同。
负载均衡机制,Producer 选择 ConsumerQueue 队列的时候默认不启用 Broker 故障延迟机制。如果此时选择的队列发送消息失败了,此时重试机制在再次选择队列时 lastBrokerName 不为空,回到最开始的那个方法,还是利用 sendWhichQueue 自增获取值,但这里多了一个步骤,与消息队列的长度进行取模运算后,如果此时选择的队列所在的 broker 还是上一次选择失败的 broker,则继续选择下一个队列。
故障延迟机制,不得不说,故障延迟机制还真是一个不错的设计。在介绍 NameServer 时提到,NameServer 为了简化和客户端通信,发现 Broker 故障时并不会立即通知客户端。故障规避机制用来解决当 Broker 出现故障,Producer 不能及时感知而导致消息发送失败的问题。默认是不开启的,如果在开启的情况下,消息发送失败的时候会将失败的 Broker 暂时排除在队列选择列表外。规避时间是衰减的,如果 Broker 一直不可用,会被 NameServer 检测到并在 Producer 更新路由信息时进行剔除。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。
喜欢的话,点赞、再看、分享三连。