开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息发送4发送消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12474
消息发送4发送消息
接下来看一下消息发送的第四步。
消息发送的第四步就是在前面三步已经完成的验证消息,选择路由,选择队列的基础之上,真正给broker里面开始发送信息。
那么整个的发送过程是什么样呢?代码从哪里获取呢?
在如下图中,找到send
点进到send里面。Return send(msg,this.defaultMQProducer.getSendMSGTimeout());
找到以下代码位置:
MessageQueuemqSelected=this.selectd oneMessageQueue(TopicpublishInfo,LastBrokerName);mq=mqSelected;
接着上一步,选择当前的这个message queen,选择message queen之后,它现在就要开始发送消息,而且调用了sendkerne1Imo1这个方法去发送。
整个的发送的逻辑是在这里面:(nu11==brokerAddr)
在这里面发送的时候,它总共做了哪些事情?如图所示:
首先第一步是先获得了broker的网络地址。之后判断下网络地址,如果是为空就要重新去更新publishInfo,重新去找broker。如下代码所示:
TrytofindtopicPublishIbfo(mq.getTopic())
如果它不为空,就可以真正的去发送消息了。再去发送消息的时候,
首先先对这个消息进行唯一I D的设置,如下代码所示:
MessageclientIDSetter.SerUniqID(msg)
它使用MessageClientIDSetter对消息设置了唯一的一个I D,设置完这个唯一I D之后判断一下这个消息,如果它超过4K,就要进行一个压缩。在这里面有一行代码:
If(this.tryTocompressmessage(msg))
就是tryToCompressMessage,把这个消息传到这个方法当中去校验一下当前是否需要压缩,如下代码所示:
Private Boolean tryTocompressMessage(final Message msg)
从整体来看,它将这个消息的程度做了一个判断,看它是否等于1024*4这个属性是否等于4K,如果它大于等于4K,那说明要去压缩,所以把压缩的标识给它改为true。如下代码所示:
msgBodyCompressed=true
那么在进行发送的时候,它就会对消息进行一个压缩的处理。
在发送消息之前去指定一个钩子函数,这个钩子函数主要的目的就是在发送消息之前做一个逻辑增强的作用.如下代码所示:
If(this.hassendMessagegeHook()
在这里它先要去判断一下,如果发送消息的时候客户端指定了钩子方法, 那么它就在这里先对钩子方法进行执行,然后再去发送消息。如图所示:
先去判断一下,看有没有这个增强逻辑的钩子方法。处理完这个事情之后,就要开始发送信息了。
发送消息它肯定是一个网络请求,所以在这里它就封装了一个请求投的信息,将请求投的信息进行封装。如下图:
在这里将这个request Header的主要信息封装完了之后从sendMessage这个地方向外去发送请求。
怎么向外发送请求?
又要去找broker去发送网络请求,再给这个broker发送消息。所以它依然用的是MQClientInstance,然后它再去调用的MQClientApllmpl去封装这个业务消息发送的逻辑。如下图所示:
1、第一步先去找broker这个网络地址。
2、给broker设置一个唯一I D。
3、判断这个消息是否大于4K,如果大于4K要进行压缩。
4、判断有没有消息发送之前的增强逻辑,如果有那么就去执行,如果没有那就不用理会。
5、封装消息,发送消息请求的请求包,调用MQClientInstance向broker发送消息。
以上为整个消息发送的第四步。