RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(下)

简介: RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(下)

2.2.1、不启用broker故障延迟


既然sendLatencyFaultEnable默认是false,那就先看当sendLatencyFaultEnable=false时候的逻辑


public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 第一次就是null,第二次(也就是重试的时候)就不是null了。
    if (lastBrokerName == null) {
        // 第一次选择队列的逻辑
        return selectOneMessageQueue();
    } else {
        // 第一次选择队列发送消息失败了,第二次重试的时候选择队列的逻辑
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
   // 过滤掉上次发送消息失败的队列
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}


那就继续看第一次选择队列的逻辑:


public MessageQueue selectOneMessageQueue() {
    // 当前线程有个ThreadLocal变量,存放了一个随机数 {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement}
    // 然后取出随机数根据队列长度取模且将随机数+1
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0) {
        pos = 0;
    }
    return this.messageQueueList.get(pos);
}

好吧,其实也有点随机一个的意思。但是亮点在于取出随机数根据队列长度取模且将随机数+1,这个+1亮了(getAndIncrement cas +1)。


当消息第一次发送失败时,lastBrokerName会存放当前选择失败的broker(mq = mqSelected),通过重试,此时lastBrokerName有值,代表上次选择的boker发送失败,则重新对sendWhichQueue本地线程变量+1,遍历选择消息队列,直到不是上次的broker,也就是为了规避上次发送失败的broker的逻辑所在。


举个例子:你这次随机数是1,队列长度是4,1%4=1,这时候失败了,进入重试,那么重试之前,也就是在上一步1%4之后,他把1进行了++操作,变成了2,那么你这次重试的时候就是2%4=2,直接过滤掉了刚才失败的broker。


那就继续看第二次重试选择队列的逻辑:


// +1
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
    // 取模
    int pos = Math.abs(index++) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = this.messageQueueList.get(pos);
    // 过滤掉上次发送消息失败的队列
    if (!mq.getBrokerName().equals(lastBrokerName)) {
        return mq;
    }
}
// 没找到能用的queue的话继续走默认的那个
return selectOneMessageQueue();


so easy,你上次不是失败了,进入我这里重试来了吗?我也很简单,我就还是取出随机数+1然后取模队列长度,我看这个broker是不是上次失败的那个,是他小子的话就过滤掉,继续遍历queue找下一个能用的。


2.2.2、启用broker故障延迟


也就是下面if里的逻辑


if (this.sendLatencyFaultEnable) {
    ....
}


看上面的注释就行了,很清晰了,就是我先(随机数 +1) % queue.size(),然后看你这个queue所属的broker是否可用,可用的话且不是重试进来的或失败重试的情况,如果和选择的队列是上次重试是一样的,那直接return你就完事了。那么怎么看broker是否可用的呢?



// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)}
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()}
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}


疑问:


  • faultItemTable是什么时候放进去的?


  • isAvailable() 为什么只是判断一个时间就可以知道Broker是否可用?


这就需要上面发送消息完成后所调用的这个方法了:


// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem}
// 发送开始时间
beginTimestampPrev = System.currentTimeMillis();
// 进行发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
// 发送结束时间
endTimestamp = System.currentTimeMillis();
// 更新broker的延迟情况
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);


细节逻辑如下:


// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 首次isolation传入的是false,currentLatency是发送消息所耗费的时间,如下
        // this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 根据延迟时间对比MQFaultStrategy中的延迟级别数组latencyMax 不可用时长数组notAvailableDuration 来将该broker加进faultItemTable中。
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        // 假设currentLatency花费了10ms,那么latencyMax里的数据显然不符合下面的所有判断,所以直接return 0;
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()}
@Override
// 其实主要就是给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
// 也就是说只有notAvailableDuration == 0的时候,isAvailable()才会返回true。
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}


下面这两句代码详细解释下:


private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax notAvailableDuration
50L 0L
100L 0L
550L 30000L
1000L 60000L
2000L 120000L
3000L 180000L
15000L 600000L



  • currentLatency大于等于50小于100,则notAvailableDuration为0
  • currentLatency大于等于100小于550,则notAvailableDuration为0
  • currentLatency大于等于550小于1000,则notAvailableDuration为300000
  • …等等


再来举个例子:


假设isolation传入true,


long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);


那么notAvailableDuration将传入600000L。结合isAvailable方法,大概流程如下:


RocketMQ为每个Broker预测了个可用时间(当前时间+notAvailableDuration),当当前时间大于该时间,才代表Broker可用,而notAvailableDuration有6个级别和latencyMax的区间一一对应,根据传入的currentLatency去预测该Broker在什么时候可用。


所以再来看这个


public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}


根据执行时间来看落入哪个区间,在0~100的时间内notAvailableDuration都是0,都是可用的,大于该值后,可用的时间就会开始变大了,就认为不是最优解,直接舍弃。


2.3、调用链


org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx)
->
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx) 
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)


2.4、总结


  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间


三、总结


1、疑问


他搞了两个重载send()方法,一个支持算法选择器,一个不支持算法选择,queue的算法选择是个典型的策略模式。为什么send(message)方法内置的queue选择算法不抽出到单独的类中,然后此类实现

org.apache.rocketmq.client.producer.MessageQueueSelector接口呢?比如叫:SelectMessageQueueByBest,比如如下:


public class org.apache.rocketmq.client.producer.DefaultMQProducer {
    // 只发送消息,queue的选择由默认的算法来实现
    @Override
    public SendResult send(Collection<Message> msgs) {
        this.send(msgs, new SelectMessageQueueByBest().select(xxx));
    }
    // 自定义选择queue的算法进行发消息
    @Override
    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {}
}

我猜测可能是这个算法过于复杂,与其它类的交互也过于多,参数也可能和内置的其他三个不同,所以没搞到一起,但是还是搞到一起规范呀,干的同一件事,只是算法不同,很典型的策略模式。


2、面试


问:发消息的时候选择queue的算法有哪些?

答:分为两种,一种是直接发消息,不能选择queue,这种的queue选择算法如下:

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间

另外一种是发消息的时候可以选择算法甚至还可以实现接口自定义算法:

  • SelectMessageQueueByRandom:随机
  • SelectMessageQueueByHash:hash
  • SelectMessageQueueByMachineRoom
  • 实现接口自定义

END

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
机器学习/深度学习 人工智能 算法
深度学习入门:理解神经网络与反向传播算法
【9月更文挑战第20天】本文将深入浅出地介绍深度学习中的基石—神经网络,以及背后的魔法—反向传播算法。我们将通过直观的例子和简单的数学公式,带你领略这一技术的魅力。无论你是编程新手,还是有一定基础的开发者,这篇文章都将为你打开深度学习的大门,让你对神经网络的工作原理有一个清晰的认识。
|
2月前
|
机器学习/深度学习 算法 Python
机器学习入门:理解并实现K-近邻算法
机器学习入门:理解并实现K-近邻算法
42 0
|
3月前
|
机器学习/深度学习 算法
机器学习入门(三):K近邻算法原理 | KNN算法原理
机器学习入门(三):K近邻算法原理 | KNN算法原理
|
3月前
|
机器学习/深度学习 算法 大数据
机器学习入门:梯度下降算法(下)
机器学习入门:梯度下降算法(下)
|
3月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
3月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
3月前
|
机器学习/深度学习 算法
机器学习入门:梯度下降算法(上)
机器学习入门:梯度下降算法(上)
|
5月前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
62 0
分享一下rocketmq入门小知识
|
2天前
|
算法 数据安全/隐私保护
室内障碍物射线追踪算法matlab模拟仿真
### 简介 本项目展示了室内障碍物射线追踪算法在无线通信中的应用。通过Matlab 2022a实现,包含完整程序运行效果(无水印),支持增加发射点和室内墙壁设置。核心代码配有详细中文注释及操作视频。该算法基于几何光学原理,模拟信号在复杂室内环境中的传播路径与强度,涵盖场景建模、射线发射、传播及接收点场强计算等步骤,为无线网络规划提供重要依据。
|
3天前
|
机器学习/深度学习 数据采集 算法
基于GA遗传优化的CNN-GRU-SAM网络时间序列回归预测算法matlab仿真
本项目基于MATLAB2022a实现时间序列预测,采用CNN-GRU-SAM网络结构。卷积层提取局部特征,GRU层处理长期依赖,自注意力机制捕捉全局特征。完整代码含中文注释和操作视频,运行效果无水印展示。算法通过数据归一化、种群初始化、适应度计算、个体更新等步骤优化网络参数,最终输出预测结果。适用于金融市场、气象预报等领域。
基于GA遗传优化的CNN-GRU-SAM网络时间序列回归预测算法matlab仿真