RocketMQ高手之路系列之七:RocketMQ之消息发送(四)

简介: 本文主要阐述了消息发送的基本流程。其中包括了获取Broker地址、设置全局ID、构建请求包以及发送消息了。在下篇文章中我们继续来看消息发送的底层流程。

引言

前面我们介绍了RocketMQ在发送消息之前做了一系列的准备事项,其中包括路由选择、队列选择以及坏点Broker退避等等。本文将开始阐述RocketMQ的消息发送过程。

  • 消息发送基本流程
  • 总结

一、消息发送基本流程

下面我们看下进行消息发送的最核心的API,即DefaultMQProducerImpl类中的sendKernelImpl方法如下所示(相关字段注释如下):

private SendResult sendKernelImpl(
//需要发送的消息
final Message msg,
                    //消息需要发送到的消息队列
                                      final MessageQueue mq,
                                      //消息发送模式
                                      final CommunicationMode communicationMode,
                                      //异步消息回调
                                      final SendCallback sendCallback,
                                      //主题路由信息
                                      final TopicPublishInfo topicPublishInfo,
                                      //超时时间
                                      final long timeout)

涉及的具体步骤如下所示:

image.png

我们具体看下源码,相关注释如下所示:

//根据选择的MessageQueue获取对应Broker地址
 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    //如果为获取到
        if (null == brokerAddr) {
          //从NameServer进行主动更新TOPIC信息
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

进行全局ID设置,源码如下:

 if (!(msg instanceof MessageBatch)) {
          //设置全局唯一ID
                    MessageClientIDSetter.setUniqID(msg);
                }
        //消息提的默认大小大小超过4K,则进行zip压缩,并设置消息系统标记
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
        //如果事务,则进行系统事务标记
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

是否执行钩子函数,源码如下所示:

//判断是否进行消息发送钩子函数注册了,为一个列表
if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }
                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

进行消息发送请求包的构建,源码如下所示:

//消息发送请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//设置消息生产组              requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
//设置主题名称            
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//设置默认 在单个Broker默认队列数               
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                //设置系统标记
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
//设置最大重试次数                        
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

最后根据communicationMode,进行消息发送模式的选择,其中包括了同步发送方式、异步发送方式以及单向发送方式,大致的源码如下所示:

SendResult sendResult = null;
                switch (communicationMode) {
                  //异步方式
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    //单向以及同步
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

二、总结

本文主要阐述了消息发送的基本流程。其中包括了获取Broker地址、设置全局ID、构建请求包以及发送消息了。在下篇文章中我们继续来看消息发送的底层流程。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
723 1
|
消息中间件 存储 算法
【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
261 39
|
消息中间件 数据可视化 API
RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送
472 1
|
消息中间件 存储
RabbitMQ如何保证消息发送成功
RabbitMQ如何保证消息发送成功
450 0
RabbitMQ如何保证消息发送成功
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
226 0
|
消息中间件 存储 Java
Spring Boot 中的 RabbitMQ 消息发送配置
Spring Boot 中的 RabbitMQ 消息发送配置
|
消息中间件 存储 中间件
消息中间件-RocketMQ入门 消息发送的三种方式
消息中间件-RocketMQ入门 消息发送的三种方式
307 0
|
消息中间件 RocketMQ
RocketMQ极简入门-RocketMQ顺序消息发送
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。 顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?
302 0
|
消息中间件 Java RocketMQ
四.RocketMQ极简入门-RocketMQ顺序消息发送
RocketMQ极简入门-RocketMQ顺序消息发送
|
消息中间件 数据可视化 API
三.RocketMQ极简入门-RocketMQ普通消息发送
RocketMQ极简入门-RocketMQ普通消息发送