开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):转发数据到 ConsumerQueue 文件】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12484
转发数据 ConsumerQueue 文件
数据更新到 ConsumerQueue 的一个基本流程。
去循环,调用两个 Dispatcher 。
public void doDispatch(DispatchRequest req){
for(CommitLogDispatcher dispatcher : this.dispatcherList)
{
dispatcher.dispatch(req);
}
}
分别是给 ConsumerQueen 去进行数据分发的 Dispatcher ,和给 index 进行数据分发的 Dispatcher 。
进入给 ConsumerQueue 进行数据分发的类当中
会通过 putMessagePositionInfo 方法去进行具体的数据分发的请求。
defaultMessageStore.this.putMessagePositionInfo(request);
public void putMessagePositionInfo(DispatchRequest dispatchRequest){
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(),dispatchRequest.getQueueId())
Cq.putMessagePositionInfoWrapper(dispatchRequest);
}
发现第一行代码会根据消息主题和队列 ID 获得消息消费队列。现在给了一个消息队列 ConsumerQueue ,要去分发消息,所以先拿到当前这个主题 getTopic ,根据主题 ID getQueue 去拿到消息队列,然后再通过 putMessagePositionInfoWrapper 去处理当前的请求。
有一个循环,这个循环 maxRetries ,循环30次。
这里会将消息偏移量,消息长度, tag 写入到 ByteBuffer 缓冲区当中。前面的代码是做了一些检查的工作。
最核心的代码 putMessagePositionInfo ,在这里才真正进行数据的分发。点进去。
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
首先将消息偏移量,消息长度,tag 写到 byteBuffer 。
然后获得 mappedFile 的文件。
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
通过 mappedFile 追加数据。
return mappedFile.appendMessage(this.byteBufferIndex.array());
数据已经写入到以下位置代码里。
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
这就是给 ConsumerQueue 进行数据分发的基本的流程,也是通过内存映射的方式去进行磁盘写入。最终也拿到了 mappedFile 映射文件,然后追加在内存当中对应的数据。
以上是转发 ConsumerQueue 的基本流程。