开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息发送1-消息校验】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12471
消息发送1-消息校验
消息发送
消息发送的过程主要分为四步:验证消息、查找路由、选择队列和发送消息
查找路由的目的是要知道当前的消息发给哪个broker,找到broker之后选择队列发送消息。
下面看消息发送的第一步:
1.消息校验
入口在DefaultMQProducer中的send方法,在发送时是通过defaultMQprodueerImpl实现类发送的。
public sendResult send(Nessage msg) throws MQClientException,RemotingException,NQBrokerException
validators.checkPessage(msg,defaultMQProducer: this);
msg.setTopic(withNamespace(mSg-getTopic());
return this.defaultNQprodueerImpl.send(msg);
}
进入defaultNQprodueerImpl:
public SendResult send(Nesskge msg) throws MQClientException,RemotingException,NQBrokerException
return send(msg,this.defaultMQProducer.getsendMsglimeo
ut());
getsendMsglimeout()这个参数是超时时间,默认是3s
以上内容中的send调用的是sendDefaultImpl方法,指定当前发送方式是同步,同步是内有回调函数,timeout是超时时间。
return this.sendDefaultImpl(msg,CommunicationMode.sYNc,sendCallback:i null timeout);
下面是消息发送的全部业务逻辑:
private sendResult sendbefaultimpk
Message msg,
final communicationtode communicationMode,
final Sendcal1back sendcal1back,
final long timeout
)throws wQclientException,RemotingException,MQBrokerException,InterruptedException {
this.makesurestateOK(;
validators.checkMessage(msg, this.dofaultNQProducer);
final long invokeID = random.nextLong(;
long beginTimestampFirst - system.currentTimeMillis();
long beginTimestampPrev - beginTimestampFirst;
long endTimestamp - beginTimestampFirst;
TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != nul1 && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
sendResult sendResult = null;
int timesTotal = communicationMode =m CommunicationMode.sYNc ? 1 + this.defaultMQProducor
int times = 0;
string[brokerssent = new String[timesTotal];
for (; times < timesTotal; timesi) {
在defaultMQprodueerImpl实现类中调用sendmessage,在sendmessage里设置超时时间,再调用senddefaultrImpl,最终的业务逻辑是在senddefaultrImpl完成。
validators.checkMessage是用来验证消息,主要验证:
public static void checkMessage(Message msg,DefaultMQProducer defaultMQProducer)
throws MQclientException {
//判断是否为空
if (null == msg) {
throw new MQc1ientException(Responsecode .MESSAGE_ILLE
GAL,“the message is null");
}
//校验主题(主题名称、长度是否合法,当前主题名称是否为默认主题名)
validators.checkTopic(msg.getTopic());
//校验消息体
if (null == msg.,getBodoy()) {
throw new MQclientException(ResponseCode.MESSAGE_I
LLEGAL,"the message body is nul1");
}
if (0 == msg-getBody ( .length) {
throw new MQClientException(ResponseCode.MESSAGE_I
LLEGAL,"the message body length is zero)
}
//取出消息长度,校验消息程度是否大于getMaxessag
eSize(),默认消息长度最大是4M
If (msg.getBody().longth > defaultMQProducer.getMaxessag
eSize()) {
throw new MQclientException(Responsecpde.MESSAGE_ILL
EGAL,
"the message body size over max value,NAx: " +default'QProducer.getMaxMessageSize())
}
总结:找到DefaultMQProducer中的send方法,之后进入defaultMQprodueerImpl,到return send(msg,this.defaultMQProducer.getsendMsglimeo
ut())设置超时时间,之后到validators.checkMessage(msg, this.dofaultNQProducer)进行消息校验,如主题校验、消息体校验。