RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(上)

简介: RocketMQ入门到入土(六)发消息的时候选择queue的算法有哪些?(上)

一、说明


分为两种,一种是直接发消息,client内部有选择queue的算法,不允许外界改变。还有一种是可以自定义queue的选择算法(内置了三种算法,不喜欢的话可以自定义算法实现)。


public class org.apache.rocketmq.client.producer.DefaultMQProducer {
    // 只发送消息,queue的选择由默认的算法来实现
    @Override
    public SendResult send(Collection<Message> msgs) {}
    // 自定义选择queue的算法进行发消息
    @Override
    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {}
}


二、源码


1、send(msg, mq)


1.1、使用场景


有时候我们不希望默认的queue选择算法,而是需要自定义,一般最常用的场景在顺序消息,顺序消息的发送一般都会指定某组特征的消息都发当同一个queue里,这样才能保证顺序,因为单queue是有序的。


对顺序消息不明白的请看我之前的顺序消息文章。


1.2、原理剖析


内置了三种算法,三种算法都实现了一个共同的接口:


org.apache.rocketmq.client.producer.MessageQueueSelector


  • SelectMessageQueueByRandom
  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • 要想自定义逻辑的话,直接实现接口重写select方法即可。


很典型的策略模式,不同算法不同实现类,有个顶层接口。


1.2.1、SelectMessageQueueByRandom


public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // mqs.size():队列的个数。假设队列个数是4,那么这个value就是0-3之间随机。
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}


so easy,就是纯随机。

mqs.size():队列的个数。假设队列个数是4,那么这个value就是0-3之间随机。


1.2.2、SelectMessageQueueByHash


public class SelectMessageQueueByHash implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        // 防止出现负数,取个绝对值,这也是我们平时开发中需要注意到的点
        if (value < 0) {
            value = Math.abs(value);
        }
        // 直接取余队列个数。
        value = value % mqs.size();
        return mqs.get(value);
    }
}


so easy,就是纯取余。

mqs.size():队列的个数。假设队列个数是4,且value的hashcode是3,那么3 % 4 = 3,那么就是最后一个队列,也就是四号队列,因为下标从0开始。


1.2.3、SelectMessageQueueByMachineRoom


public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }
    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }
    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}


没看懂有啥鸟用,直接return null; 所以如果有自定义需求的话直接自定义就好了,这玩意没看出有啥卵用。


1.2.4、自定义算法


public class MySelectMessageQueue implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(0);
    }
}


永远都选择0号队列,也就是第一个队列。只是举个例子,实际看你业务需求。


1.3、调用链


org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg)
->
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg)
->
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl(xxx)
->
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
->
selector.select(messageQueueList, userMessage, arg)
->
org.apache.rocketmq.client.producer.MessageQueueSelector#select(final List<MessageQueue> mqs, final Message msg, final Object arg)


2、send(msg)


2.1、使用场景


一般没特殊需求的场景都用这个。因为他默认的queue选择算法很不错,各种优化场景都替我们想到了。


2.2、原理剖析


// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl}
// 这是发送消息核心原理,不清楚的看我之前发消息源码分析的文章
// 选择消息要发送的队列
MessageQueue mq = null;
for (int times = 0; times < 3; times++) {
    // 首次肯定是null
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    // 调用下面的方法进行选择queue
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        // 给mq赋值,如果首次失败了,那么下次重试的时候(也就是下次for的时候),mq就有值了。
        mq = mqSelected;
        ......
        // 很关键,能解答下面会提到的两个问题:
        // 1.faultItemTable是什么时候放进去的?
        // 2.isAvailable() 为什么只是判断一个时间就可以知道Broker是否可用?   
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);    
    }
}


选择queue的主入口


public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 默认为false,代表不启用broker故障延迟
    if (this.sendLatencyFaultEnable) {
        try {
            // 随机数且+1
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            // 遍历
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // 先(随机数 +1) % queue.size()
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0) {
                    pos = 0;
                }
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 看找到的这个queue所属的broker是不是可用的
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    // 非失败重试,直接返回到的队列
                    // 失败重试的情况,如果和选择的队列是上次重试是一样的,则返回
                    // 也就是说如果你这个queue所在的broker可用,
                    // 且不是重试进来的或失败重试的情况,如果和选择的队列是上次重试是一样的,那你就是天选之子了。
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
            }
   // 如果所有队列都不可用,那么选择一个相对好的broker,不考虑可用性的消息队列
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
  // 随机选择一个queue
        return tpInfo.selectOneMessageQueue();
    }
 // 当sendLatencyFaultEnable=false的时候选择queue的方法,默认就是false。
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
机器学习/深度学习 人工智能 算法
AI入门必读:Java实现常见AI算法及实际应用,有两下子!
本文全面介绍了人工智能(AI)的基础知识、操作教程、算法实现及其在实际项目中的应用。首先,从AI的概念出发,解释了AI如何使机器具备学习、思考、决策和交流的能力,并列举了日常生活中的常见应用场景,如手机助手、推荐系统、自动驾驶等。接着,详细介绍了AI在提高效率、增强用户体验、促进技术创新和解决复杂问题等方面的显著作用,同时展望了AI的未来发展趋势,包括自我学习能力的提升、人机协作的增强、伦理法规的完善以及行业垂直化应用的拓展等...
146 3
AI入门必读:Java实现常见AI算法及实际应用,有两下子!
|
27天前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
27 0
分享一下rocketmq入门小知识
|
1月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
123 2
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
37 3
|
2月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
|
2月前
|
消息中间件 存储 缓存
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决
MetaQ/RocketMQ 原理问题之Consume queue中的条目长度是固定的问题如何解决
|
2月前
|
机器学习/深度学习 数据采集 人工智能
机器学习算法入门与实践
【7月更文挑战第22天】机器学习算法入门与实践是一个既充满挑战又极具吸引力的过程。通过掌握基础知识、理解常见算法、注重数据预处理和模型选择、持续学习新技术和参与实践项目,你可以逐步提高自己的机器学习技能,并在实际应用中取得优异的成绩。记住,机器学习是一个不断迭代和改进的过程,保持好奇心和耐心,你将在这个领域走得更远。
|
2月前
|
消息中间件 存储 算法
实战算法的基础入门(2)
实战算法的基础入门
|
1月前
|
存储 算法
【C算法】编程初学者入门训练140道(1~20)
【C算法】编程初学者入门训练140道(1~20)
|
2月前
|
算法 Java
实战算法的基础入门(3)
实战算法的基础入门