RocketMQ消息生产者是如何选择Broker的

简介: RocketMQ消息生产者是如何选择Broker的

前言

RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?

从NameServer查询Topic信息

通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
复制代码

这个方法就是通过topicNameServer拉出对应的Broker信息:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码

1.一开始的话,是从当前缓存中找Topic信息,第一次肯定是找不到的;

2.找不到Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)NameServer拉对应的信息,如果拉到了就更新到缓存中;

3.如果依然找不到Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;

生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;

如何选择Broker

当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                  ......
复制代码

1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;

2.选择Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;

追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
复制代码

1.如果开启了延迟故障规避,那么执行规避策略;

  • 1.1:轮询找一个Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);
  • 1.2:如果所有的Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;
  • 1.3:如果依然没有找到合适的Broker,那么就随机选一个Broker

2.否则就随机选一个Broker

下面我们来看一下随机发送的策略是怎么实现的:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
复制代码

1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker,其实就是轮询方式;

2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker服务,也是为了让Broker服务负载均衡;

3.如果没有避开上一次的Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker

小结

通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:

1.第一次发消息,通过轮询的方式选择Broker

2.后续发消息会规避上次的Broker,同样采用轮询的方式选择Broker

3.在消息发送过程中,存在一个Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
276 0
|
5月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
57 0
|
4月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
89 0
|
4月前
|
消息中间件 网络协议 API
RocketMQ - 生产者启动流程
RocketMQ - 生产者启动流程
52 0