RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(下)

简介: RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(下)

2.2.1、不启用broker故障延迟


既然sendLatencyFaultEnable默认是false,那就先看当sendLatencyFaultEnable=false时候的逻辑


public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 第一次就是null,第二次(也就是重试的时候)就不是null了。
    if (lastBrokerName == null) {
        // 第一次选择队列的逻辑
        return selectOneMessageQueue();
    } else {
        // 第一次选择队列发送消息失败了,第二次重试的时候选择队列的逻辑
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
   // 过滤掉上次发送消息失败的队列
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}


那就继续看第一次选择队列的逻辑:


public MessageQueue selectOneMessageQueue() {
    // 当前线程有个ThreadLocal变量,存放了一个随机数 {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement}
    // 然后取出随机数根据队列长度取模且将随机数+1
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0) {
        pos = 0;
    }
    return this.messageQueueList.get(pos);
}

好吧,其实也有点随机一个的意思。但是亮点在于取出随机数根据队列长度取模且将随机数+1,这个+1亮了(getAndIncrement cas +1)。


当消息第一次发送失败时,lastBrokerName会存放当前选择失败的broker(mq = mqSelected),通过重试,此时lastBrokerName有值,代表上次选择的boker发送失败,则重新对sendWhichQueue本地线程变量+1,遍历选择消息队列,直到不是上次的broker,也就是为了规避上次发送失败的broker的逻辑所在。


举个例子:你这次随机数是1,队列长度是4,1%4=1,这时候失败了,进入重试,那么重试之前,也就是在上一步1%4之后,他把1进行了++操作,变成了2,那么你这次重试的时候就是2%4=2,直接过滤掉了刚才失败的broker。


那就继续看第二次重试选择队列的逻辑:


// +1
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
    // 取模
    int pos = Math.abs(index++) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = this.messageQueueList.get(pos);
    // 过滤掉上次发送消息失败的队列
    if (!mq.getBrokerName().equals(lastBrokerName)) {
        return mq;
    }
}
// 没找到能用的queue的话继续走默认的那个
return selectOneMessageQueue();


so easy,你上次不是失败了,进入我这里重试来了吗?我也很简单,我就还是取出随机数+1然后取模队列长度,我看这个broker是不是上次失败的那个,是他小子的话就过滤掉,继续遍历queue找下一个能用的。


2.2.2、启用broker故障延迟


也就是下面if里的逻辑


if (this.sendLatencyFaultEnable) {
    ....
}


看上面的注释就行了,很清晰了,就是我先(随机数 +1) % queue.size(),然后看你这个queue所属的broker是否可用,可用的话且不是重试进来的或失败重试的情况,如果和选择的队列是上次重试是一样的,那直接return你就完事了。那么怎么看broker是否可用的呢?



// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)}
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()}
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}


疑问:


  • faultItemTable是什么时候放进去的?


  • isAvailable() 为什么只是判断一个时间就可以知道Broker是否可用?


这就需要上面发送消息完成后所调用的这个方法了:


// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem}
// 发送开始时间
beginTimestampPrev = System.currentTimeMillis();
// 进行发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
// 发送结束时间
endTimestamp = System.currentTimeMillis();
// 更新broker的延迟情况
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);


细节逻辑如下:


// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 首次isolation传入的是false,currentLatency是发送消息所耗费的时间,如下
        // this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 根据延迟时间对比MQFaultStrategy中的延迟级别数组latencyMax 不可用时长数组notAvailableDuration 来将该broker加进faultItemTable中。
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        // 假设currentLatency花费了10ms,那么latencyMax里的数据显然不符合下面的所有判断,所以直接return 0;
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()}
@Override
// 其实主要就是给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
// 也就是说只有notAvailableDuration == 0的时候,isAvailable()才会返回true。
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}


下面这两句代码详细解释下:


private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax notAvailableDuration
50L 0L
100L 0L
550L 30000L
1000L 60000L
2000L 120000L
3000L 180000L
15000L 600000L



  • currentLatency大于等于50小于100,则notAvailableDuration为0
  • currentLatency大于等于100小于550,则notAvailableDuration为0
  • currentLatency大于等于550小于1000,则notAvailableDuration为300000
  • …等等


再来举个例子:


假设isolation传入true,


long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);


那么notAvailableDuration将传入600000L。结合isAvailable方法,大概流程如下:


RocketMQ为每个Broker预测了个可用时间(当前时间+notAvailableDuration),当当前时间大于该时间,才代表Broker可用,而notAvailableDuration有6个级别和latencyMax的区间一一对应,根据传入的currentLatency去预测该Broker在什么时候可用。


所以再来看这个


public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}


根据执行时间来看落入哪个区间,在0~100的时间内notAvailableDuration都是0,都是可用的,大于该值后,可用的时间就会开始变大了,就认为不是最优解,直接舍弃。


2.3、调用链


org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx)
->
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx) 
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)


2.4、总结


  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间


三、总结


1、疑问


他搞了两个重载send()方法,一个支持算法选择器,一个不支持算法选择,queue的算法选择是个典型的策略模式。为什么send(message)方法内置的queue选择算法不抽出到单独的类中,然后此类实现

org.apache.rocketmq.client.producer.MessageQueueSelector接口呢?比如叫:SelectMessageQueueByBest,比如如下:


public class org.apache.rocketmq.client.producer.DefaultMQProducer {
    // 只发送消息,queue的选择由默认的算法来实现
    @Override
    public SendResult send(Collection<Message> msgs) {
        this.send(msgs, new SelectMessageQueueByBest().select(xxx));
    }
    // 自定义选择queue的算法进行发消息
    @Override
    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {}
}

我猜测可能是这个算法过于复杂,与其它类的交互也过于多,参数也可能和内置的其他三个不同,所以没搞到一起,但是还是搞到一起规范呀,干的同一件事,只是算法不同,很典型的策略模式。


2、面试


问:发消息的时候选择queue的算法有哪些?

答:分为两种,一种是直接发消息,不能选择queue,这种的queue选择算法如下:

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间

另外一种是发消息的时候可以选择算法甚至还可以实现接口自定义算法:

  • SelectMessageQueueByRandom:随机
  • SelectMessageQueueByHash:hash
  • SelectMessageQueueByMachineRoom
  • 实现接口自定义

END

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
机器学习/深度学习 人工智能 算法
AI入门必读:Java实现常见AI算法及实际应用,有两下子!
本文全面介绍了人工智能(AI)的基础知识、操作教程、算法实现及其在实际项目中的应用。首先,从AI的概念出发,解释了AI如何使机器具备学习、思考、决策和交流的能力,并列举了日常生活中的常见应用场景,如手机助手、推荐系统、自动驾驶等。接着,详细介绍了AI在提高效率、增强用户体验、促进技术创新和解决复杂问题等方面的显著作用,同时展望了AI的未来发展趋势,包括自我学习能力的提升、人机协作的增强、伦理法规的完善以及行业垂直化应用的拓展等...
146 3
AI入门必读:Java实现常见AI算法及实际应用,有两下子!
|
27天前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
27 0
分享一下rocketmq入门小知识
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
123 2
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
37 3
|
2月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
|
2月前
|
消息中间件 存储 缓存
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决
|
2月前
|
机器学习/深度学习 数据采集 人工智能
机器学习算法入门与实践
【7月更文挑战第22天】机器学习算法入门与实践是一个既充满挑战又极具吸引力的过程。通过掌握基础知识、理解常见算法、注重数据预处理和模型选择、持续学习新技术和参与实践项目,你可以逐步提高自己的机器学习技能,并在实际应用中取得优异的成绩。记住,机器学习是一个不断迭代和改进的过程,保持好奇心和耐心,你将在这个领域走得更远。
|
2月前
|
消息中间件 存储 算法
实战算法的基础入门(2)
实战算法的基础入门
|
1月前
|
存储 算法
【C算法】编程初学者入门训练140道(1~20)
【C算法】编程初学者入门训练140道(1~20)
|
2月前
|
算法 Java
实战算法的基础入门(3)
实战算法的基础入门