RocketMQ学习(六):消息的生命周期上之消息的产生

简介:

源代码版本是3.2.6。消息的生命周期包括2部分,消息的产生和消息的消费,这篇先说下前者。消息的产生详细一点可以分为:

a.消息产生后由Producer发送至Broker。

b.Broker接收到消息做持久化。

调试代码得到这样的过程,

1.DefaultMQProducer.send()发出消息。

2.DefaultMQProducerImpl.sendDefaultImpl()发出消息。

3.DefaultMQProducerImpl.tryToFindTopicPublishInfo(),即向Namesrv发出GET_ROUTEINTO_BY_TOPIC的请求,来更新
MQProducerInner的topicPublishInfoTable和MQConsumerInner的topicSubscribeInfoTable。

4.调用topicPublishInfo.selectOneMessageQueue(),从发布的topic中轮询取出一个MessageQueue。默认一个topic对应4个MessageQueue。

5.调用mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()),获取brokerAddr(broker的地址)。

6.调用this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,// 1
mq.getBrokerName(),// 2
msg,// 3
requestHeader,// 4
timeout,// 5
communicationMode,// 6
sendCallback// 7
)发送。

7.调用MQClientAPIIImpl.sendMessageSync(addr, brokerName, msg, timeoutMillis, request)发送。

8.调用NettyRemotingClient.invokeSyncImpl()发送。

######到此Producer端发消息结束######

———我是分割线———-

######接着Request走到Broker######

9.SendMessageProcessor.processRequest(),接收到消息,封装requestHeader成broker内部的消息MessageExtBrokerInner,然后DefaultMessageStore.putMessage(msgInner),调用CommitLog.putMessage(msg)。

10.调用MapedFileQueue.getLastMapedFile()获取将要写入消息的文件,mapedFile.appendMessage(msg,this.appendMessageCallback)写入消息。

11.AppendMessageCallback.doAppend(fileFromOffset, byteBuffer,maxBlank,Object msg),用回调方法存储msg。

12.MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),wroteOffset),用存储消息的节点ip和端口,加上准备写的偏移量(就是在前面获取的文件中)生成msgId。

13.以(topic-queueId)为key从topicQueueTable取queueOffset,queueOffset如果为null则设为0,存入topicQueueTable。

14.调用MessageSysFlag.getTransactionValue(msgInner.getSysFlag())获取tranType来判断该消息是否是事务消息,如果是TransactionPreparedType或者TransactionRollbackType,则queueOffset=0,这2种类型的消息是不会被消费的。见16,17。

15.调用byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen)写入文件。

16.构造DispatchRequest,然后DispatchMessageService.putRequest(dispatchRequest),异步DispatchMessageService.doDispatch(),分发消息位置信息到ConsumeQueue。如果是TransactionPreparedType或者TransactionRollbackType,则不处理,如果是TransactionNotType或者TransactionCommitType,则调用DefaultMessageStore.this.putMessagePostionInfo()。

17.调用ConsumeQueue.putMessagePostionInfo(),20个字节大小的buffer在内存里,offset即消息对应的在CommitLog的offset,size即消息在CommitLog存储中的大小,tagsCode即计算出来的长整数,写入buffer,this.mapedFileQueue.getLastMapedFile(expectLogicOffset)获取mapedFile,最后mapedFile.appendMessage(this.byteBufferIndex.array())写入文件,作逻辑队列持久化。

说明:当Broker接收到从Consumer发来的拉取消息的请求时,根据请求的Topic和queueId获取对应的ConsumerQueue,由于消息的类型是预备消息或者回滚消息时,不作持久化(即没有把消息体本身存储在CommitLog中的offset保存到ConsumerQueue中),那么自然也找不到该消息的逻辑存储单元(也就是前面的20个字节,根据这20个字节可以在CommitLog中定位到一条消息),最终Consumer也取不到该消息。

打个比喻,CommitLog是书的正文,消息体存在于CommitLog中,相当于是书正文中的一个章节,那么ConsumerQueue就是书的目录,记录着章节和页数的对应关系,如果是预备类型或者回滚类型的章节,目录中没有记录,即使在书的正文中存在,但是我们查找章节时都是通过目录来查找的,目录里没有,就找不到该章节。

18.DefaultMessageStore.this.indexService.putRequest(this.requestsRead.toArray()),新建索引。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
6月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
168 0
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
6月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
76 0
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
58 0
|
6月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
810 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1438 15
|
6月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
46 0
|
6月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
93 0