消息发送3-选择队列|学习笔记

简介: 快速学习消息发送3-选择队列

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送3-选择队列】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12473


消息发送3-选择队列


选择队列

第二步查找路由已经完成,确定路由就是确定要给哪个broker发送消息,broker中有需要messageQueue,所以第三步是查找当前路由中给哪个messageQueue发送信息

DefaultMQProducerImpl代码:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

selectOneMessageQueue传递了两个参数:topicPublishInfo, lastBrokerName。

lastBrokerName是上一次发送的broker,作用是负载均衡。

点击selectOneMessageQueue,进入MQFaultStrategy代码:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){

if (this.sendLatencyFaultEnable) {

注:sendLatencyFaultEnable判断的是一个布尔类型的参数,默认是FALSE。在进行消息发送时,如果消息发送失败,它有一个默认延迟发送的机制,这功能默认是关闭的。

在默认FALSE的情况下,

return tpInfo.selectoneMessageQueue( lastBrokerName);

点进TopicPublishInfo:

public MessageQueue selectoneMessageQueue() {

//维护索引index

int index = this.sendwhichQueue.getAndIncrement();

//使用索引对messageQueueList取模

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos =0 ;

//利用pos获取队列

例:假如有一个索引,如果第一次为0,使用0对整个MessageQueue长度进行取模,得到0,返回的是第一个,做完之后要Increment。第二次进行取模,1对整个取模取的是第二个。以此类推,索引加到2,2对长度4取模选择的是第二个;如果索引是3,3对整个长度取模取的是第三个。

image.pngreturn this.messageQueueList.get(pos);

}

如果broker延迟机制默认FALSE,则是依次从broker队列取出selectoneMessageQueue并返回,在TopicPublishInfo中依次利用索引对selectoneMessageQueue长度进行取模;

如果lastBrokerNmae不为空(默认不启用broker故障延迟机制):

public MessageQueue selectoneMessageQueue(final string lastBrokerName) {

//第一次选择队列

if (lastBrokerName == null) {

return selectoneMessageQueue(;

} else {

// sendwhichQueue

int index = this.sendwhichQueue. getAndIncrement(;

//遍历消息队列集合

for (int i = 0; i < this.messageQueueList.size(); i++) {

// sendwhichQueue自增后取模

int pos = Math.abs(index++) % this.messageQueueList.size(;

if (pos < 0)

pos = 0;

//规避上次Broker队列,如果二者不同,则进行返回

MessageQueue mq = this.messageQueueList.get(pos);

if ( ! mq. getBrokerName().equals(lastBrokerName)) {

注:这行代码很重要,找到MessageQueue的broker如果和当前lastBrokerName一样,就不进行返回,如果不一样,返回。

return mq;

}

}

//如果以上情况都不满足,返回sendwhichQueue取模后的队列

Return selectOneMessageQueue();

}

public MessageQueue selectoneMessageQueue() {

int index = this.sendwhichQueue.getAndIncrement();

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos =0 ;

return this.messageQueueList.get(pos) ;

}

如果将sendLatencyFaultEnable默认FALSE改为TRUE:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerWame){

try {

//根据索引在TopicPIblishInfo中选择一个队列

int index = tpInfo.getsendwhichQueue().getAndIncrement();

for (int i = o; i < tpInfo.getlMessageQueueList().size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < o)

pos = 0;

//队列选择后,校验当前broker是否合法

MessageQueue mq = tpInfo.getMessageQueueList().get(pos ) ;

if (latencyFaultTolerance.isAvailable(mq.getBrokerName( ))) {

//可用

if (null == lastBrokerName || mq.getBrokerName( ).equals(lastBrokerName))

return mq;

}

}

如何通过latencyFaultTolerance进行校验:

代码:LatencyFaultToleranceImpl

@override

public boolean isAvailable(final string name) {

final Faultitem faultItem = this.faultItemTable.get(name) ;if(faultItem != null) {

return faultItem.isAvailable();

}

return true;

}

在LatencyFaultToleranceImpl类中,维护了所有失败的broker表,如果根据name找到faultItem,代表当前不可用;如果没有找到,代表可用,直接取出返回;

如果不可用:

//从LatencyFaultToleranceImpl中的缓存表中取出一个相对较好的broker去发送

final string notBestBroker = latencyFaultTolerance.pickOneAtLeast();

//获得broker的写队列集合

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

//获得一个队列,指定broker和队列ID并返回

final MessageQueue mq = tpInfo.selectoneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName( notBestBroker);

mq.setQueueId(tpInfo.getsendwhichQueue( ).getAndIncrement() % writeQueueNums);

}

return mq;

}

如果打开broker失败延迟机制,过程是先取出,如果broker是可用的,直接返回;如果不可用,代表曾经对broker发送失败,解决策略是从faultItemTable中取出一个相对来说比较好的broker,然后取出写队列,封装一个MessageQueue并返回。

如果没有打开broker失败延迟的机制,则按照索引对messageQueue进行取模,最终再进行返回。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件
六、死信队列
六、死信队列
72 0
|
4月前
|
存储 监控 安全
死信队列的死信队列
死信队列的死信队列
|
消息中间件
RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
|
6月前
|
消息中间件 存储 Java
消息队列-死信队列
消息队列-死信队列
69 0
|
7月前
|
消息中间件 编解码 Java
|
消息中间件
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
|
消息中间件 存储
RabbitMQ 发布确认 交换机 死信队列 延迟队列(上)
RabbitMQ 发布确认 交换机 死信队列 延迟队列(上)
95 0
|
消息中间件
RabbitMQ 的死信队列、延迟队列
RabbitMQ 的死信队列、延迟队列
97 0
|
消息中间件
死信队列和延迟队列的介绍
死信队列和延迟队列的介绍
RabbmitMQ学习笔记-死信队列
RabbmitMQ学习笔记-死信队列
65 0
下一篇
DataWorks