RocketMQ - 生产者消息发送流程

简介: RocketMQ - 生产者消息发送流程

RocketMQ客户端的消息发送通常分为以下3层

  • 业务层:通常指直接调用RocketMQ Client发送API的业务代码。
  • 消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。
  • 通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层。

消息发送流程首先是RocketMQ客户端接收业务层消息,然后通过DefaultMQProducerImpl发送一个RPC请求给Broker,再由Broker处理请求并保存消息。

下面以DefaultMQProducer.send(Messagemsg)接口为例讲解发送流程,

  • 第一步:调用defaultMQProducerImpl.send()方法发送消息。
  • 第二步:通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息。设置的超时时间可以通过sendMsgTimeout进行变更,其默认值为3s。
  • 第三步:执行 defaultMQProducerImpl.sendDefaultImpl()方法。
private SendResult sendDefaultImpl(
    Message msg,
    //通信模式,同步、异步还是单向
    final CommunicationMode communicationMode,
    //对于异步模式,需要设置发送完成后的回调
    final SendCallback sendCallback,
    final long timeout
){}

该方法是发送消息的核心方法,执行过程分为5步:

第一步: 两个检查:生产者状态、消息及消息内容。没有运行的生产者不能发送消息。消息检查主要检查消息是否为空,消息的Topic的名字是否为空或者是否符合规范;消息体大小是否符合要求,最大值为4MB,可以通过maxMessageSize进行设置。

第二步: 执行tryToFindTopicPublishInfo()方法:获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,就通过Namesrv获取路由信息,更新到本地,再返回。具体实现代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

第三步: 计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。

第四步: 执行队列选择方法selectOneMessageQueue()。根据队列对象中保存的上次发送消息的Broker的名字和Topic路由,选择(轮询)一个Queue将消息发送到 Broker 。 我们可以通过sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的 Broker,默认值为False。

第五步: 执行sendKernelImpl()方法。该方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层,内部实现是基于Netty的,在封装为通信层request对象RemotingCommand前,会设置RequestCode表示当前请求是发送单个消息还是批量消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
5月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
58 0
|
5月前
|
消息中间件 网络协议 API
RocketMQ - 生产者启动流程
RocketMQ - 生产者启动流程
54 0
|
5月前
|
消息中间件 Kafka Apache
RocketMQ - 生产者原理
RocketMQ - 生产者原理
56 0
|
6月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
172 2
|
6月前
|
消息中间件 负载均衡 Apache
【RocketMQ系列七】消费者和生产者的实现细节
【RocketMQ系列七】消费者和生产者的实现细节
156 1
|
7月前
|
消息中间件 监控 网络安全
在RocketMQ中,生产者提交数据导致连接不上问题
【6月更文挑战第19天】在RocketMQ中,生产者提交数据导致连接不上问题
286 1
|
8月前
|
监控 安全 物联网
阿里云mqtt简介和使用流程
本文介绍了阿里云MQTT的准备工作、简介和使用流程。首先,用户需要注册阿里云账号并完成实名认证。接着,通过阿里云物联网平台创建产品和设备,获取连接所需的Broker Address、Port、Username和Password。然后,使用MQTT客户端(如MQTTX)配置这些信息进行连接,并激活设备。最后,创建并订阅/发布自定义Topic,实现设备间的通信。阿里云MQTT是一个适用于物联网设备的轻量级通信协议,提供高并发、高可靠性的服务,广泛应用于各种物联网场景。
阿里云mqtt简介和使用流程