消息存储的流程|学习笔记

简介: 快速学习消息存储的流程

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息存储的流程】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12478


消息存储的流程

 

消息存储整个的入口在defaultmessagestore消息存储最核心的这个类当中

image.png

有一个put message这个方法,是在这里面去处理的。如下代码所示:

PublicPutMessageResult putMessage(MessageExBatch ,essageExtBatch)

这个消息存储的流程当中,rocket M Q为了提高消息存储的效率,首先将这个消息追加到内存当中去,并不会直接写入到磁盘,最后会通过专门的刷盘机制,再将内存当中的消息写到磁盘里面。

所以putMessages方法,最核心的是来看它如何将这个消息追加到内存当中。

消息存储流程的第一步是先去判断当前broker是不是slave,如果broker是slave代表当前这个broker是一个从节点,从节点是不能够去写的,所以先去做一个判断。

第二步判断一下消息主题的程度,包括消息属性的程度等等,这里其实都是在去校验当前这个broker接收到消息的合法性。如图所示:

image.png还做了一个校验,在os pagecatchebusy的一个处理,当前内存当中它是否可用,如果内存不可用,那么进行不了存储。所以前面都是在做校验的工作。

校验如果没有问题之后,那么真正消息写入内存的这个操作是通过commit log去处理的。

进到commit log boot message方法当中。在commit log这个方法当中,它先去设置消息存储时间的一个节点,对这个消息进行时间的设定,然后拿到消息存储的服务,它状态的一个服务。就是如下这行代码:Storestatsservicestorestatsservice=this.defaultMessagestore.gestore,getstorestatsservice

然后就是处理延迟消息的主题和队列,对于延迟消息来讲不需要立即去进行存储,所以以下代码位置进行了对应处理。

If(messageExtBatch.getDelayTimeLevel() > 0) {

Return new PutMessageResult(putMessageStatus.MESSAGE_ILLEGAL,appendMessageResult:null);

如果当前的是延迟消息,就要进行一个特殊的处理。

现在要去存储这个消息了,rocket M Q它用的是这种文件内存映射的机制,首先是拿到了这个文件存储的commit log所对应的mapped file的映射文件,拿到最后一个映射文件。如下代码所示:

MappedFile mappedFile=this.mappledFileQueue.getLastMappedFile()

拿到这个映射文件之后从如下图所示的这个地方开始,去给这个映射文件进行存储。

image.png

对于如下图上面来看:

image.png

整个的存储的过程当中它是一个同步的,并且在这里是加锁的状态:

如下代码所示:

Put Message?Lock.lock()

同步之后是为了多线程情况之下,保证线程的安全。消息的存储拿到这个mapped file之后先去判断了一下,看这个mapped file是不是为空,或者它是不是已经写满,如果为空或者写满都要重新的去创建一个新的。

If(null == mappedFile || mappedFile.isFull()) {

mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset:0);}

如果没有问题就开始去进行消息的追加,把它追加到这个内存映射文件当中去。在mapped file这里面调用了它的一个open messages这个方法,它的这个方法在这里面又调用了一个append message inner这个方法。

最终其实用的就是mapped file这里面的append message inner这个方法再进行消息的一个追加。

它的追加的过程中,是先去获得了消息的写的指针,看现在是往哪个位置去写,下图就是拿到它写的位置。如下代码所示:

Int currentPos=this.wrotePosition.ger()

If(currentPos<this.fileSize)

如果当前这个指针小于文件的总的大小,那说明当前没有写满,这个写的操作是合法的。如果走到这一步发现这个写的这个指针已经大于并且超过文件的总大小了,就不能去写了。

如下图:

image.png在这里面,在整个过程当中首先将这个消息,将它写入到缓冲区里,byteBuffer它其实就是在对外申请了一个缓冲区,然后通过这个cb.doAppend真正去完成追加消息的处理。

cb.doAppend里。它首先拿到当前写的位置。

Long wroteoffest=fileFromoffest+byteBuffer.positon()

然后又设置了一个消息的I D,如下图所示:

就在之前开头图上的这一步:

image.png将这个获得消息在队列当中设一个偏移量。现在要往哪个位置上去写就要把这个位置确定下来。

Long  queueOffset = CommitLog.this.topicQueueTable.get(key);

因为commit log这个映射文件,它有可能是已经写了一半了,要得知道往哪个位置去写。这就是拿到这个消息后写入的一个偏移量。

所以整个在这个下图中就开始写,如图所示:

image.png

注意:比如写的这个偏移量它应该是在这里,但是整个消息的长度如果是这么长。就是现在从左到右这里面已经写满了。如果消息的总长度是这么长,而消息的总长度是这么大。那么这个消息肯定是写不到里面的所以在真正在去写入之前,它又去对整个长度进行了处理。示意图如下图所示:

image.png

计算消息的长度再计算整个消息的总长度,判断一下,消息的总长度如果超过了文件剩余的大小,那么就要去创建新文件了。

从如下代码这个位置开始:

if (propertiesLength> Short MAX-VALUE)

If(

拿出这个属性的集合,再去算出属性的长度,这是topic topic的长度

如下代码所示:

final byte[] topicData =msgInner.getTopic(). getBytes(MessageDecoder.CHARSET UTF8);

这是body的长度,如下代码所示:

final int bodyLength=msgInner. getBody()==null: msgInner.getBody().longth; 

这是消息的长度

如下代码所示:

final int msglencaLMsglength(bodyLength, topiclength, propertieslength) 

通过bodyLength这个方法计算总长度,如果总长度大于剩余的文件的大小,那么就让它在如下图位置中重新再创建一个新的文件,然后再去写。

给byteBuffer当中写入这个消息,写0到msgLen,如下代码 :

byteBuffer.put(this.msgstoreItemMemory.array(), offset:0 msgLen

整个过程是它把这个消息真正的写入到了内存里,并没有直接把它写入到这个词盘里。

image.png

那么对于图上面来看,就是在如下这个位置。

image.png

它写完之后去更新了一下消息队列的一个偏移量。

在如下这个位置

If((totalMsgLen+END FILE MIN BLANK LENGTH)>maxBlank)

在这里看到的就是消息的长度,如果大于总的长度,要去进行新的commitlog文件的创建。如果都没有问题,就开始去写。写完之后它对于整个消息的偏移量进行了更新的处理。

整个这个位置就会通过doAppend回调,整个写完之后就可以去返回。它是从以下这个位置回调的:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <

在cb.doAppend,它整个写完后将这个结果返回给mappedfile,mappedfile再将这个结果返回给commitlog。

result = mappedFile.appendMessages(messageExtBatch,this.appendMessageCallBack);

来找一下commit log能够看到mappedfile再从下图找mappedfile去写。

image.png写完之后,mappedfile就把结果返回来了,可以发现整个方法就已经完成了对内存的追加。追加完了后整个过程下面还有两个代码。如下代码所示:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <

这两个代码是在刷盘。

之前所看的整个追加的过程,其实仅仅只是仅仅将消息追加到内存当中去。真正的写到磁盘要在它追加完了后得通过handle DiskFlush这个方法去进行刷盘的处理。代码如下所示:

handleDiskFlush(result, putMessageResult, messageExtBatch);

以上是文件存储的流程。

相关文章
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3869 1
|
消息中间件 Java 调度
自顶向下学习 RocketMQ(六):定时消息
定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。
自顶向下学习 RocketMQ(六):定时消息
|
4月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
175 3
|
5月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(五):消息存储
深入理解Kafka核心设计及原理(五):消息存储
167 8
|
5月前
|
消息中间件 Kafka 程序员
彻底搞懂Kafka生产消费流程,这篇文章就够了!
```markdown 🚀 Kafka 生产消费流程揭秘:Producer 创建守护线程Sender,消息经拦截器→序列化器→分区器→缓冲区。批量发送基于batch.size或linger.ms条件。acks参数控制可靠性,从0(最快但不可靠)到all(最可靠)。消息重试和元数据返回确保不丢失。关注“软件求生”公众号,探索更多技术! ```
214 1
|
5月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
|
5月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
6月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 RocketMQ
消息队列 MQ产品使用合集之如何防止丢数据
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
消息中间件 存储 RocketMQ
大白话-设计RocketMQ延迟消息
RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。

热门文章

最新文章