开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息发送3-选择队列】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12473
消息发送3-选择队列
选择队列
第二步查找路由已经完成,确定路由就是确定要给哪个broker发送消息,broker中有需要messageQueue,所以第三步是查找当前路由中给哪个messageQueue发送信息
DefaultMQProducerImpl代码:
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
selectOneMessageQueue传递了两个参数:topicPublishInfo, lastBrokerName。
lastBrokerName是上一次发送的broker,作用是负载均衡。
点击selectOneMessageQueue,进入MQFaultStrategy代码:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){
if (this.sendLatencyFaultEnable) {
注:sendLatencyFaultEnable判断的是一个布尔类型的参数,默认是FALSE。在进行消息发送时,如果消息发送失败,它有一个默认延迟发送的机制,这功能默认是关闭的。
在默认FALSE的情况下,
return tpInfo.selectoneMessageQueue( lastBrokerName);
点进TopicPublishInfo:
public MessageQueue selectoneMessageQueue() {
//维护索引index
int index = this.sendwhichQueue.getAndIncrement();
//使用索引对messageQueueList取模
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos =0 ;
//利用pos获取队列
例:假如有一个索引,如果第一次为0,使用0对整个MessageQueue长度进行取模,得到0,返回的是第一个,做完之后要Increment。第二次进行取模,1对整个取模取的是第二个。以此类推,索引加到2,2对长度4取模选择的是第二个;如果索引是3,3对整个长度取模取的是第三个。
return this.messageQueueList.get(pos);
}
如果broker延迟机制默认FALSE,则是依次从broker队列取出selectoneMessageQueue并返回,在TopicPublishInfo中依次利用索引对selectoneMessageQueue长度进行取模;
如果lastBrokerNmae不为空(默认不启用broker故障延迟机制):
public MessageQueue selectoneMessageQueue(final string lastBrokerName) {
//第一次选择队列
if (lastBrokerName == null) {
return selectoneMessageQueue(;
} else {
// sendwhichQueue
int index = this.sendwhichQueue. getAndIncrement(;
//遍历消息队列集合
for (int i = 0; i < this.messageQueueList.size(); i++) {
// sendwhichQueue自增后取模
int pos = Math.abs(index++) % this.messageQueueList.size(;
if (pos < 0)
pos = 0;
//规避上次Broker队列,如果二者不同,则进行返回
MessageQueue mq = this.messageQueueList.get(pos);
if ( ! mq. getBrokerName().equals(lastBrokerName)) {
注:这行代码很重要,找到MessageQueue的broker如果和当前lastBrokerName一样,就不进行返回,如果不一样,返回。
return mq;
}
}
//如果以上情况都不满足,返回sendwhichQueue取模后的队列
Return selectOneMessageQueue();
}
public MessageQueue selectoneMessageQueue() {
int index = this.sendwhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos =0 ;
return this.messageQueueList.get(pos) ;
}
如果将sendLatencyFaultEnable默认FALSE改为TRUE:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){
try {
//根据索引在TopicPIblishInfo中选择一个队列
int index = tpInfo.getsendwhichQueue().getAndIncrement();
for (int i = o; i < tpInfo.getlMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < o)
pos = 0;
//队列选择后,校验当前broker是否合法
MessageQueue mq = tpInfo.getMessageQueueList().get(pos ) ;
if (latencyFaultTolerance.isAvailable(mq.getBrokerName( ))) {
//可用
if (null == lastBrokerName || mq.getBrokerName( ).equals(lastBrokerName))
return mq;
}
}
如何通过latencyFaultTolerance进行校验:
代码:LatencyFaultToleranceImpl
@override
public boolean isAvailable(final string name) {
final Faultitem faultItem = this.faultItemTable.get(name) ;if(faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
在LatencyFaultToleranceImpl类中,维护了所有失败的broker表,如果根据name找到faultItem,代表当前不可用;如果没有找到,代表可用,直接取出返回;
如果不可用:
//从LatencyFaultToleranceImpl中的缓存表中取出一个相对较好的broker去发送
final string notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//获得broker的写队列集合
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
//获得一个队列,指定broker和队列ID并返回
final MessageQueue mq = tpInfo.selectoneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName( notBestBroker);
mq.setQueueId(tpInfo.getsendwhichQueue( ).getAndIncrement() % writeQueueNums);
}
return mq;
}
如果打开broker失败延迟机制,过程是先取出,如果broker是可用的,直接返回;如果不可用,代表曾经对broker发送失败,解决策略是从faultItemTable中取出一个相对来说比较好的broker,然后取出写队列,封装一个MessageQueue并返回。
如果没有打开broker失败延迟的机制,则按照索引对messageQueue进行取模,最终再进行返回。