开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):转发 IndexFile 文件】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12485
转发 IndexFile 文件
转发到 Index 索引文件的过程。
回到 doDispatch 方法当中。
public void doDispatch(DispatchRequest req) {
for(CommitLogDispatcher dispatcher:this.dispatcherList){
dispatcher.dispatch(req);
}
}
然后点击 dispatch 查看 CommitLogDispatcherBuildIndex 接口的实现类。
public void doDispatch(DispatchRequest req) {
for(CommitLogDispatcher dispatcher:this.dispatcherList){
dispatcher.dispatch(req);
}
}
点击进入,会发现调用了 buildIndex 方法。
defaultMessageStore.this.indexService.buildIndex(request);
进入这个方法中,第一步拿到了 IndexFile 索引文件,然后获得索引文件最大的偏移量,目的是做一个判断,如果消息的偏移量小于物理文件的最大偏移量 endPhyOffset ,那说明当前的消息存在重复消息,那么直接返回。
IndexFile indexFile = retryGetAndCreateIndexFile();
if(indexFile != null){
Long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if(msg.getCommitLogOffset() < endPhyOffset){
return;
}
}
如果当前不存在重复消息,就会根据消息的 ID 去构建索引,然后通过 key 给它构建到索引文件 indexFile 中。先构建 key ,将 key 构建到 indexFile 中。这就是整个索引文件构建的过程。
if(req.getUniqKey() != null){
indexFile= putKey(indexFile,msg,buildKey(topic,req,getUniqKey)));
if(indexFile == null){
Log.error(“putKey error commitlog {} uniqkey {}”,req.getCommitLogOffset(),req.getUniqKey)
return;
}
}
回顾
转发 Index ,入口在 buildIndex 。
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreconfig.isMessageIndexEnable()) {
DefaultMessagestore.this.indexService.buildIndex(request);
}
}
}
在 bulidIndex 中做了几件重要的事,第一个是拿到索引文件。第二个是判断索引文件当中的物理偏移量如果大于消息的偏移量,说明当前的消息是重复的消息,直接返回。如果消息是正常的,根据消息去构建一个唯一的索引 key ,然后将 key 写入 indexFile 中。
IndexFile indexFile = retryGetAndCreateIndexFile();
if(indexFile != null){
Long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if(msg.getCommitLogOffset() < endPhyOffset){
return;
}
}
if(req.getUniqKey() != null){
indexFile= putKey(indexFile,msg,buildKey(topic,req,getUniqKey)));
if(indexFile == null){
Log.error(“putKeyerrorcommitlog{}uniqkey {}”,req.getCommitLogOffset(),req.getUniqKey)
return;
}
}
以上是转发到Index的流程。