开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息存储的流程】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12478
消息存储的流程
消息存储整个的入口在defaultmessagestore消息存储最核心的这个类当中
有一个put message这个方法,是在这里面去处理的。如下代码所示:
PublicPutMessageResult putMessage(MessageExBatch ,essageExtBatch)
这个消息存储的流程当中,rocket M Q为了提高消息存储的效率,首先将这个消息追加到内存当中去,并不会直接写入到磁盘,最后会通过专门的刷盘机制,再将内存当中的消息写到磁盘里面。
所以putMessages方法,最核心的是来看它如何将这个消息追加到内存当中。
消息存储流程的第一步是先去判断当前broker是不是slave,如果broker是slave代表当前这个broker是一个从节点,从节点是不能够去写的,所以先去做一个判断。
第二步判断一下消息主题的程度,包括消息属性的程度等等,这里其实都是在去校验当前这个broker接收到消息的合法性。如图所示:
还做了一个校验,在os pagecatchebusy的一个处理,当前内存当中它是否可用,如果内存不可用,那么进行不了存储。所以前面都是在做校验的工作。
校验如果没有问题之后,那么真正消息写入内存的这个操作是通过commit log去处理的。
进到commit log boot message方法当中。在commit log这个方法当中,它先去设置消息存储时间的一个节点,对这个消息进行时间的设定,然后拿到消息存储的服务,它状态的一个服务。就是如下这行代码:Storestatsservicestorestatsservice=this.defaultMessagestore.gestore,getstorestatsservice
然后就是处理延迟消息的主题和队列,对于延迟消息来讲不需要立即去进行存储,所以以下代码位置进行了对应处理。
If(messageExtBatch.getDelayTimeLevel() > 0) {
Return new PutMessageResult(putMessageStatus.MESSAGE_ILLEGAL,appendMessageResult:null);
如果当前的是延迟消息,就要进行一个特殊的处理。
现在要去存储这个消息了,rocket M Q它用的是这种文件内存映射的机制,首先是拿到了这个文件存储的commit log所对应的mapped file的映射文件,拿到最后一个映射文件。如下代码所示:
MappedFile mappedFile=this.mappledFileQueue.getLastMappedFile()
拿到这个映射文件之后从如下图所示的这个地方开始,去给这个映射文件进行存储。
对于如下图上面来看:
整个的存储的过程当中它是一个同步的,并且在这里是加锁的状态:
如下代码所示:
Put Message?Lock.lock()
同步之后是为了多线程情况之下,保证线程的安全。消息的存储拿到这个mapped file之后先去判断了一下,看这个mapped file是不是为空,或者它是不是已经写满,如果为空或者写满都要重新的去创建一个新的。
If(null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset:0);}
如果没有问题就开始去进行消息的追加,把它追加到这个内存映射文件当中去。在mapped file这里面调用了它的一个open messages这个方法,它的这个方法在这里面又调用了一个append message inner这个方法。
最终其实用的就是mapped file这里面的append message inner这个方法再进行消息的一个追加。
它的追加的过程中,是先去获得了消息的写的指针,看现在是往哪个位置去写,下图就是拿到它写的位置。如下代码所示:
Int currentPos=this.wrotePosition.ger()
If(currentPos<this.fileSize)
如果当前这个指针小于文件的总的大小,那说明当前没有写满,这个写的操作是合法的。如果走到这一步发现这个写的这个指针已经大于并且超过文件的总大小了,就不能去写了。
如下图:
在这里面,在整个过程当中首先将这个消息,将它写入到缓冲区里,byteBuffer它其实就是在对外申请了一个缓冲区,然后通过这个cb.doAppend真正去完成追加消息的处理。
cb.doAppend里。它首先拿到当前写的位置。
Long wroteoffest=fileFromoffest+byteBuffer.positon()
然后又设置了一个消息的I D,如下图所示:
就在之前开头图上的这一步:
将这个获得消息在队列当中设一个偏移量。现在要往哪个位置上去写就要把这个位置确定下来。
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
因为commit log这个映射文件,它有可能是已经写了一半了,要得知道往哪个位置去写。这就是拿到这个消息后写入的一个偏移量。
所以整个在这个下图中就开始写,如图所示:
注意:比如写的这个偏移量它应该是在这里,但是整个消息的长度如果是这么长。就是现在从左到右这里面已经写满了。如果消息的总长度是这么长,而消息的总长度是这么大。那么这个消息肯定是写不到里面的所以在真正在去写入之前,它又去对整个长度进行了处理。示意图如下图所示:
计算消息的长度再计算整个消息的总长度,判断一下,消息的总长度如果超过了文件剩余的大小,那么就要去创建新文件了。
从如下代码这个位置开始:
if (propertiesLength> Short MAX-VALUE)
If(
拿出这个属性的集合,再去算出属性的长度,这是topic topic的长度
如下代码所示:
final byte[] topicData =msgInner.getTopic(). getBytes(MessageDecoder.CHARSET UTF8);
这是body的长度,如下代码所示:
final int bodyLength=msgInner. getBody()==null: msgInner.getBody().longth;
这是消息的长度
如下代码所示:
final int msglencaLMsglength(bodyLength, topiclength, propertieslength)
通过bodyLength这个方法计算总长度,如果总长度大于剩余的文件的大小,那么就让它在如下图位置中重新再创建一个新的文件,然后再去写。
给byteBuffer当中写入这个消息,写0到msgLen,如下代码 :
byteBuffer.put(this.msgstoreItemMemory.array(), offset:0 msgLen
整个过程是它把这个消息真正的写入到了内存里,并没有直接把它写入到这个词盘里。
那么对于图上面来看,就是在如下这个位置。
它写完之后去更新了一下消息队列的一个偏移量。
在如下这个位置
If((totalMsgLen+END FILE MIN BLANK LENGTH)>maxBlank)
在这里看到的就是消息的长度,如果大于总的长度,要去进行新的commitlog文件的创建。如果都没有问题,就开始去写。写完之后它对于整个消息的偏移量进行了更新的处理。
整个这个位置就会通过doAppend回调,整个写完之后就可以去返回。它是从以下这个位置回调的:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <
在cb.doAppend,它整个写完后将这个结果返回给mappedfile,mappedfile再将这个结果返回给commitlog。
result = mappedFile.appendMessages(messageExtBatch,this.appendMessageCallBack);
来找一下commit log能够看到mappedfile再从下图找mappedfile去写。
写完之后,mappedfile就把结果返回来了,可以发现整个方法就已经完成了对内存的追加。追加完了后整个过程下面还有两个代码。如下代码所示:result = cb.doAppEend(this . getFileFromOffset(), byteBuffer,maxBlank; this. filesize . currentPos, (MessageExtBrokerInne <
这两个代码是在刷盘。
之前所看的整个追加的过程,其实仅仅只是仅仅将消息追加到内存当中去。真正的写到磁盘要在它追加完了后得通过handle DiskFlush这个方法去进行刷盘的处理。代码如下所示:
handleDiskFlush(result, putMessageResult, messageExtBatch);
以上是文件存储的流程。