【消息中间件】RocketMQ的默认发送流程,你简单的回答下?

本文涉及的产品
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。

前言

大家好,我是小郭,上一篇文章我们介绍了RocketMQ消息发送中的默认消息发送者,打开源码跟踪了消息发送者从未启动到运行中的一个过程,今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。

思考问题

  • 消息发送者是如何做负载均衡的?
  • 消息发送者是如何保证高可用的?
  • 消息发送批量消息如何保证一致性的?

默认发送流程-工作原理

源码入口:

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)启动Demo:

DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("xxx:9876"); 
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
    "TagA" /* Tag */,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);

流程:

  1. 校验主题,设置主题
msg.setTopic(withNamespace(msg.getTopic()));
public String withNamespace(String resource) {
    return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}
  1. 默认发送方式为同步发送,默认超时时间为3s
private int sendMsgTimeout = 3000;
public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
  1. 确认 producer service 运行状态是否为运行中 入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#makeSureStateOK
//检查状态,如果不是RUNNING状态则抛出异常
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}
  1. 校验信息
    topic长达是否大于TOPIC_MAX_LENGTH,topic是否为空
    是否通过正则校验,body是否为空,body大小是否超过4M
public static void checkTopic(String topic) throws MQClientException {
    if (UtilAll.isBlank(topic)) {
        throw new MQClientException("The specified topic is blank", null);
    }
    if (topic.length() > TOPIC_MAX_LENGTH) {
        throw new MQClientException(
            String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
    }
    if (isTopicOrGroupIllegal(topic)) {
        throw new MQClientException(String.format(
                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                "^[%|a-zA-Z0-9_-]+$"), null);
    }
}
// body
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
  1. 找到主题发布的信息,未找到则抛出异常

入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

消息生产者更新和维护路由信息缓存

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 消息生产者更新和维护路由信息缓存,同时保存到本地生产者路由表中
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  1. 通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息 入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue

获取到BrokerName对应的MessageQueue信息

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

如果lastBrokerName为null,通过对 sendWhichQueue 方法获取一个队列

取余,然后从messageQueueList中获取一个MessageQueue

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
  1. 最后消息发送 入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
  2. 根绝BrokerName获取到broker地址

在启动阶段,对BrokerAddrTable信息进行了维护

public String findBrokerAddressInPublish(final String brokerName) {
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        return map.get(MixAll.MASTER_ID);
    }
    return null;
}

如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常

if (null == brokerAddr) {
    // 1.1 如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
  1. 为消息分配全局唯一ID
// 为消息分配全局唯一ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

RocketMQ消息发送-请求与响应文章中,我们已经学习了请求参数中,创建了全局唯一的MsgId,可以回头看一看

  1. 注册钩子消息发送钩子函数

这里主要做了三件事情,确认MsgType类型、是否为延迟消息、调用钩子函数内的方法

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 3.1 通过isTrans来确定MsgType类型
    if ("true".equals(isTrans)) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
     // 3.2 如果msg里面 __STARTDELIVERTIME 或者 DELAY 不为空,则设置为延迟消息
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    // 3.3 调用钩子函数里的方法
    this.executeSendMessageHookBefore(context);
}
  1. 设置发送信息请求头SendMessageRequestHeader

网络异常,图片无法展示
|

最后根据默认发送方式,进行消息的发送

主要利用NettyRemotingClient进行发送,这里就先不展开来说了 入口:MQClientAPIImpl.sendMessage()

问题答复

  • 消息发送者是如何做负载均衡的?默认采用轮询,每一个消息发送者全局会维护一个 Topic 上一次选择的队列,然后基于这个序号进行递增轮询
  1. AllocateMessageQueueAveragely
  • 平均分配,按照总数除以消费者个数进行,对每个消费者进行分配
  1. AllocateMessageQueueAveragelyByCircle 轮流平均分配,按照消费者个数,进行轮询分配
  • 消息发送者是如何保证高可用的?
    在上面的步骤中通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息,利用参数sendLatencyFaultEnable来开启关闭故障规避机制
    sendLatencyFaultEnable 设置为 true:开启延迟规避机制,一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。
    使用本次消息发送延迟时间来计算Broker故障规避时长,不参与消息发送队列负载
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
        mq.setBrokerName(notBestBroker);
        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
    }
    return mq;
}
// 按照轮询进行选择发送的MessageQueue            
for (int i = 0; i < this.messageQueueList.size(); i++) {     
    int index = this.sendWhichQueue.getAndIncrement();        
    int pos = Math.abs(index) % this.messageQueueList.size();         
    if (pos < 0)            
        pos = 0;                
    MessageQueue mq = this.messageQueueList.get(pos);        
    // 避开上一次上一次发送失败的MessageQueue                
    if (!mq.getBrokerName().equals(lastBrokerName)) {      
        return mq;          
    }         
 }          

但是这样子做可能带来的后果是Broker没有可用的情况,或者是某个Broker数据激增,增加消费者的压力,所以默认不开启规避机制,遇到消息发送失败,规避 broker-a,但是在下一次消息发送时,即再次调用broker-a。

  • 消息发送批量消息如何保证一致性的?
    将一个Topic下的消息,通过batch方法包一起发送

客户端ID与使用陷阱

摘自丁威老师的文章

网络异常,图片无法展示
|

总结

这段时间主要学习了RocketMQ的消息发送,主要是以源码为主,深入了解了消息发送的启动和消息发送的流程,以及认识到客户端ID与使用陷阱 一图总结


网络异常,图片无法展示
|

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
617 2
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
1193 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
5月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
3月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
229 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
859 92
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
377 97
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。

热门文章

最新文章

相关产品

  • 云消息队列 MQ