RocketMQ消息生产者是如何选择Broker的

简介: RocketMQ消息生产者是如何选择Broker的

前言

RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?

从NameServer查询Topic信息

通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
复制代码

这个方法就是通过topicNameServer拉出对应的Broker信息:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码

1.一开始的话,是从当前缓存中找Topic信息,第一次肯定是找不到的;

2.找不到Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)NameServer拉对应的信息,如果拉到了就更新到缓存中;

3.如果依然找不到Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;

生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;

如何选择Broker

当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                  ......
复制代码

1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;

2.选择Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;

追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
复制代码

1.如果开启了延迟故障规避,那么执行规避策略;

  • 1.1:轮询找一个Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);
  • 1.2:如果所有的Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;
  • 1.3:如果依然没有找到合适的Broker,那么就随机选一个Broker

2.否则就随机选一个Broker

下面我们来看一下随机发送的策略是怎么实现的:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
复制代码

1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker,其实就是轮询方式;

2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker服务,也是为了让Broker服务负载均衡;

3.如果没有避开上一次的Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker

小结

通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:

1.第一次发消息,通过轮询的方式选择Broker

2.后续发消息会规避上次的Broker,同样采用轮询的方式选择Broker

3.在消息发送过程中,存在一个Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Apache
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
684 0
|
2月前
|
消息中间件 Oracle Java
【RocketMq】Broker 启动脚本分析
【RocketMq】Broker 启动脚本分析
37 0
|
5月前
|
安全 数据安全/隐私保护
如何为 Mosquitto MQTT Broker 配置 MQTT TLS 和基于证书的授权
如何为 Mosquitto MQTT Broker 配置 MQTT TLS 和基于证书的授权
451 1
|
5月前
|
消息中间件 存储 缓存
RocketMQ 生产者那些事
这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。
RocketMQ 生产者那些事
|
7月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
116 0
|
9月前
|
消息中间件 存储 缓存
RocketMQ之Broker如何实现磁盘文件高性能读写
Broker中大量的使用mmap技术去实现CommitLog这种大磁盘文件的高性能读写优化的。 通过之前的学习,我们知道了一点,就是Broker对磁盘文件的写入主要是借助直接写入os cache来实现性能优化的,因为直接写入os cache,相当于就是写入内存一样的性能,后续等os内核中的线程异步把cache中的数据刷入磁盘文件即可。
EMQ
|
9月前
|
传感器 负载均衡 监控
MQTT Broker 集群解析:基础概念与高级实现
我们希望通过本系列文章全面探索当前的 MQTT 技术,提供有价值的见解,引发有意义的讨论,帮助您的 MQTT 和物联网之旅中激发创新灵感。
EMQ
219 0
MQTT Broker 集群解析:基础概念与高级实现
EMQ
|
10月前
|
SQL 监控 安全
MQTT Broker 规则引擎入门:快速指南
EMQX MQTT Broker 的规则引擎功能在 MQTT 消息转换和数据集成方面起着重要作用。本文将提供一份快速入门指南,通过实例帮助您快速上手 MQTT 规则引擎。
EMQ
336 0
MQTT Broker 规则引擎入门:快速指南
EMQ
|
10月前
|
消息中间件 物联网 机器人
2023 年最适用于工业物联网领域的三款开源 MQTT Broker
本文对比分析了 2023 年工业物联网领域最优秀的三款 MQTT Broker,介绍了它们的优点、缺点和应用场景。
EMQ
792 0
2023 年最适用于工业物联网领域的三款开源 MQTT Broker
EMQ
|
10月前
|
JSON 负载均衡 物联网
使用 Terraform 在 GCP 上一键部署 EMQX MQTT Broker
本文将指导您如何设置 GCP 项目、创建服务账户、编写 Terraform 配置文件,实现在 GCP 上轻松部署 EMQX MQTT Broker。
EMQ
117 0
使用 Terraform 在 GCP 上一键部署 EMQX MQTT Broker