前言
在RocketMQ
中为,我们创建消息生产者时,只需要设置NameServer
地址,消息就能正确地发送到对应的Broker
中,那么RocketMQ
消息生产者是如何找到Broker
的呢?如果有多个Broker
实例,那么消息发送是如何选择发送到哪个Broker
的呢?
从NameServer查询Topic信息
通过Debug消息发送send()
方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()
这个方法,并且我们找到了最关键的Topic信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 复制代码
这个方法就是通过topic
从NameServer
拉出对应的Broker
信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } 复制代码
1.一开始的话,是从当前缓存中找
Topic
信息,第一次肯定是找不到的;2.找不到
Topic
信息,那么就调用updateTopicRouteInfoFromNameServer(topic)
从NameServer
拉对应的信息,如果拉到了就更新到缓存中;3.如果依然找不到
Topic
信息,说明没有任何Broker
上面是有这个Topic
的;但是我们还要拉开启了自动创建Topic
配置的Broker
信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)
实现;
生产者客户端会从两个地方获取Broker
信息,第一个就是从内存缓存中获取,第二个就是从NameServer
中获取。从NameServer
中分两次获取,一次是获取存在的Topic
对应的Broker
信息,第二次是获取还没有创建出来的Topic
对应的Broker
信息;
如何选择Broker
当客户端拿到了Topic
对应的Broker
信息后,它是如何选择目标Broker
的呢?继续向下看,我们找到了关键代码:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); ...... 复制代码
1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;
2.选择
Broker
的关键代码就在selectOneMessageQueue()
方法中,通过前面拿到的topicPublishInfo
作为参数,lastBrokerName
作为额外的考虑参数;
追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()
中:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } 复制代码
1.如果开启了延迟故障规避,那么执行规避策略;
- 1.1:轮询找一个
Broker
,该Broker
要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);- 1.2:如果所有的
Broker
都没有度过规避期,那么从比较好的那一部分Broker
里面找一个出来;- 1.3:如果依然没有找到合适的
Broker
,那么就随机选一个Broker
;2.否则就随机选一个
Broker
;
下面我们来看一下随机发送的策略是怎么实现的:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } 复制代码
1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个
Broker
,其实就是轮询方式;2.如果不是第一次发送消息,那么会尽可能避开上一次的
Broker
服务,也是为了让Broker
服务负载均衡;3.如果没有避开上一次的
Broker
,那么再向后找另一个Broker
;除非只有一个Broker
服务,否则会尽可能避开上次发送的Broker
;
小结
通过源码分析,我们已经知道了生产者是如何选择目标Broker
的了:
1.第一次发消息,通过轮询的方式选择
Broker
;2.后续发消息会规避上次的
Broker
,同样采用轮询的方式选择Broker
;3.在消息发送过程中,存在一个
Broker
规避列表,用户可以通过setSendLatencyFaultEnable(true)
开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker
,如果所有的Broker
都在规避列表中,那么会选择一个相对比较好的Broker
来用;