消息发送4发送消息|学习笔记

简介: 快速学习消息发送4发送消息

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送4发送消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12474


消息发送4发送消息


接下来看一下消息发送的第四步。

消息发送的第四步就是在前面三步已经完成的验证消息,选择路由,选择队列的基础之上,真正给broker里面开始发送信息。

那么整个的发送过程是什么样呢?代码从哪里获取呢?

在如下图中,找到send

image.png点进到send里面。Return send(msg,this.defaultMQProducer.getSendMSGTimeout());

找到以下代码位置:

MessageQueuemqSelected=this.selectd oneMessageQueue(TopicpublishInfo,LastBrokerName);mq=mqSelected;

接着上一步,选择当前的这个message queen,选择message queen之后,它现在就要开始发送消息,而且调用了sendkerne1Imo1这个方法去发送。

整个的发送的逻辑是在这里面:(nu11==brokerAddr)

在这里面发送的时候,它总共做了哪些事情?如图所示:

image.png首先第一步是先获得了broker的网络地址。之后判断下网络地址,如果是为空就要重新去更新publishInfo,重新去找broker。如下代码所示:

TrytofindtopicPublishIbfo(mq.getTopic())

如果它不为空,就可以真正的去发送消息了。再去发送消息的时候,

首先先对这个消息进行唯一I D的设置,如下代码所示:

MessageclientIDSetter.SerUniqID(msg)

它使用MessageClientIDSetter对消息设置了唯一的一个I D,设置完这个唯一I D之后判断一下这个消息,如果它超过4K,就要进行一个压缩。在这里面有一行代码:

If(this.tryTocompressmessage(msg))

就是tryToCompressMessage,把这个消息传到这个方法当中去校验一下当前是否需要压缩,如下代码所示:

Private Boolean tryTocompressMessage(final Message msg)

从整体来看,它将这个消息的程度做了一个判断,看它是否等于1024*4这个属性是否等于4K,如果它大于等于4K,那说明要去压缩,所以把压缩的标识给它改为true。如下代码所示:

msgBodyCompressed=true

那么在进行发送的时候,它就会对消息进行一个压缩的处理。

在发送消息之前去指定一个钩子函数,这个钩子函数主要的目的就是在发送消息之前做一个逻辑增强的作用.如下代码所示:

If(this.hassendMessagegeHook()

在这里它先要去判断一下,如果发送消息的时候客户端指定了钩子方法, 那么它就在这里先对钩子方法进行执行,然后再去发送消息。如图所示:

image.png先去判断一下,看有没有这个增强逻辑的钩子方法。处理完这个事情之后,就要开始发送信息了。

发送消息它肯定是一个网络请求,所以在这里它就封装了一个请求投的信息,将请求投的信息进行封装。如下图:

image.png在这里将这个request Header的主要信息封装完了之后从sendMessage这个地方向外去发送请求。

怎么向外发送请求?

又要去找broker去发送网络请求,再给这个broker发送消息。所以它依然用的是MQClientInstance,然后它再去调用的MQClientApllmpl去封装这个业务消息发送的逻辑。如下图所示:

image.png1、第一步先去找broker这个网络地址。

2、给broker设置一个唯一I D。

3、判断这个消息是否大于4K,如果大于4K要进行压缩。

4、判断有没有消息发送之前的增强逻辑,如果有那么就去执行,如果没有那就不用理会。

5、封装消息,发送消息请求的请求包,调用MQClientInstance向broker发送消息。

以上为整个消息发送的第四步。

 

相关文章
|
5月前
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
|
7月前
|
消息中间件
RabbitMQ如何确保消息发送,消息接收
RabbitMQ如何确保消息发送,消息接收
45 0
|
消息中间件 缓存 RocketMQ
消息发送5-总结|学习笔记
快速学习消息发送5-总结
42 0
消息发送5-总结|学习笔记
|
消息中间件 存储 运维
【视频】普通消息 | 学习笔记
快速学习【视频】普通消息
302 0
【视频】普通消息 | 学习笔记
|
消息中间件 物联网 Linux
Msgsnd 消息发送|学习笔记
快速学习 Msgsnd 消息发送
185 0
|
消息中间件 缓存 负载均衡
消息发送3-选择队列|学习笔记
快速学习消息发送3-选择队列
85 0
消息发送3-选择队列|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
267 0
Msgrcv 接收消息|学习笔记
|
消息中间件 RocketMQ 开发者
发送异步消息|学习笔记
快速学习发送异步消息
57 0
|
前端开发 Java Nacos
服务生产者 | 学习笔记
快速学习服务生产者。
59 0
服务生产者 | 学习笔记
|
消息中间件 存储 网络协议
RabbitMQ——消息发送和消息接收机制
RabbitMQ——消息发送和消息接收机制
RabbitMQ——消息发送和消息接收机制