引言
前面我们介绍了RocketMQ在发送消息之前做了一系列的准备事项,其中包括路由选择、队列选择以及坏点Broker退避等等。本文将开始阐述RocketMQ的消息发送过程。
- 消息发送基本流程
- 总结
一、消息发送基本流程
下面我们看下进行消息发送的最核心的API,即DefaultMQProducerImpl类中的sendKernelImpl方法如下所示(相关字段注释如下):
private SendResult sendKernelImpl( //需要发送的消息 final Message msg, //消息需要发送到的消息队列 final MessageQueue mq, //消息发送模式 final CommunicationMode communicationMode, //异步消息回调 final SendCallback sendCallback, //主题路由信息 final TopicPublishInfo topicPublishInfo, //超时时间 final long timeout)
涉及的具体步骤如下所示:
我们具体看下源码,相关注释如下所示:
//根据选择的MessageQueue获取对应Broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //如果为获取到 if (null == brokerAddr) { //从NameServer进行主动更新TOPIC信息 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); }
进行全局ID设置,源码如下:
if (!(msg instanceof MessageBatch)) { //设置全局唯一ID MessageClientIDSetter.setUniqID(msg); } //消息提的默认大小大小超过4K,则进行zip压缩,并设置消息系统标记 int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } //如果事务,则进行系统事务标记 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }
是否执行钩子函数,源码如下所示:
//判断是否进行消息发送钩子函数注册了,为一个列表 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); }
进行消息发送请求包的构建,源码如下所示:
//消息发送请求头 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); //设置消息生产组 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); //设置主题名称 requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); //设置默认 在单个Broker默认队列数 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); //设置系统标记 requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { //设置最大重试次数 requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } }
最后根据communicationMode,进行消息发送模式的选择,其中包括了同步发送方式、异步发送方式以及单向发送方式,大致的源码如下所示:
SendResult sendResult = null; switch (communicationMode) { //异步方式 case ASYNC: Message tmpMessage = msg; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); msg.setBody(prevBody); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; //单向以及同步 case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }
二、总结
本文主要阐述了消息发送的基本流程。其中包括了获取Broker地址、设置全局ID、构建请求包以及发送消息了。在下篇文章中我们继续来看消息发送的底层流程。
