消息发送1-消息校验|学习笔记

简介: 快速学习消息发送1-消息校验

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送1-消息校验】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12471


消息发送1-消息校验


消息发送

image.png

消息发送的过程主要分为四步:验证消息、查找路由、选择队列和发送消息

查找路由的目的是要知道当前的消息发给哪个broker,找到broker之后选择队列发送消息。

下面看消息发送的第一步:

1.消息校验

入口在DefaultMQProducer中的send方法,在发送时是通过defaultMQprodueerImpl实现类发送的。

public sendResult send(Nessage msg) throws MQClientException,RemotingException,NQBrokerException

validators.checkPessage(msg,defaultMQProducer: this);

msg.setTopic(withNamespace(mSg-getTopic());

return this.defaultNQprodueerImpl.send(msg);

}

进入defaultNQprodueerImpl:

public SendResult send(Nesskge msg) throws MQClientException,RemotingException,NQBrokerException

return send(msg,this.defaultMQProducer.getsendMsglimeo

ut());

getsendMsglimeout()这个参数是超时时间,默认是3s

以上内容中的send调用的是sendDefaultImpl方法,指定当前发送方式是同步,同步是内有回调函数,timeout是超时时间。

return this.sendDefaultImpl(msg,CommunicationMode.sYNc,sendCallback:i null timeout);

下面是消息发送的全部业务逻辑:

private sendResult sendbefaultimpk

Message msg,

final communicationtode communicationMode,

final Sendcal1back sendcal1back,

final long timeout

)throws wQclientException,RemotingException,MQBrokerException,InterruptedException {

this.makesurestateOK(;

validators.checkMessage(msg, this.dofaultNQProducer);

final long invokeID = random.nextLong(;

long beginTimestampFirst - system.currentTimeMillis();

long beginTimestampPrev - beginTimestampFirst;

long endTimestamp - beginTimestampFirst;

TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

if (topicPublishInfo != nul1 && topicPublishInfo.ok()) {

boolean callTimeout = false;

MessageQueue mq = null;

Exception exception = null;

sendResult sendResult = null;

int timesTotal = communicationMode =m CommunicationMode.sYNc ? 1 + this.defaultMQProducor

int times = 0;

string[brokerssent = new String[timesTotal];

for (; times < timesTotal; timesi) {

在defaultMQprodueerImpl实现类中调用sendmessage,在sendmessage里设置超时时间,再调用senddefaultrImpl,最终的业务逻辑是在senddefaultrImpl完成。

validators.checkMessage是用来验证消息,主要验证:

public static void checkMessage(Message msg,DefaultMQProducer defaultMQProducer)

throws MQclientException {

//判断是否为空

if (null == msg) {

throw new MQc1ientException(Responsecode .MESSAGE_ILLE

GAL,“the message is null");

}

//校验主题(主题名称、长度是否合法,当前主题名称是否为默认主题名)

validators.checkTopic(msg.getTopic());

//校验消息体

if (null == msg.,getBodoy()) {

throw new MQclientException(ResponseCode.MESSAGE_I

LLEGAL,"the message body is nul1");

}

if (0 == msg-getBody ( .length) {

throw new MQClientException(ResponseCode.MESSAGE_I

LLEGAL,"the message body length is zero)

}

//取出消息长度,校验消息程度是否大于getMaxessag

eSize(),默认消息长度最大是4M

If (msg.getBody().longth > defaultMQProducer.getMaxessag

eSize()) {

throw new MQclientException(Responsecpde.MESSAGE_ILL

EGAL,

"the message body size over max value,NAx: " +default'QProducer.getMaxMessageSize())

}

总结:找到DefaultMQProducer中的send方法,之后进入defaultMQprodueerImpl,到return send(msg,this.defaultMQProducer.getsendMsglimeo

ut())设置超时时间,之后到validators.checkMessage(msg, this.dofaultNQProducer)进行消息校验,如主题校验、消息体校验。

相关文章
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
91 1
|
7月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
消息中间件 NoSQL Redis
消息重复消费的问题
消息重复消费的问题
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
86 0
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
编解码 Java 测试技术
消息类型-普通消息|学习笔记
快速学习消息类型-普通消息
176 0
消息类型-普通消息|学习笔记
|
存储 Java 数据库
消息类型-事务消息|学习笔记
快速学习消息类型-事务消息
164 0
消息类型-事务消息|学习笔记
|
消息中间件 负载均衡 Java
下单消息的发送和接收案例|学习笔记
快速学习下单消息的发送和接收案例
223 0
下单消息的发送和接收案例|学习笔记
|
消息中间件 存储 运维
多类型业务消息专题-普通消息 | 学习笔记(二)
快速学习多类型业务消息专题-普通消息
132 0
多类型业务消息专题-普通消息 | 学习笔记(二)
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
Msgrcv 接收消息|学习笔记