RocketMQ给broker发送消息确定Commitlog的写入的位置

简介: 问题有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?

文件格式概述


commitlog消息单元存储结构


commitlog中存储的是客户端发送的所有数据


微信截图_20230225153656.png

ConsumeQueue消息单元存储结构


ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。


1.png


流程图


2.png


源码跟踪(broker启动流程里)


入口方法


DefaultMessageStore###load


public boolean load() {
        boolean result = true;
        try {
           //省略
            // 装载Commit Log
            result = result && this.commitLog.load();
            if (result) {
                //省略
                //确定Commit Log文件下一个写的位置
                this.recover(lastExitOK);
            }
        } catch (Exception e) {
        }
        return result;
    }


装载commitlog:把commitlog中下的文件都映射成MappedFile,方便读写


CommitLog###load


public boolean load() {
        //跟进去,调用mappedFileQueue.load方法
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }


MappedFileQueue###load方法:在该方法中把commitlog下的文件映射成MappedFile


public boolean load() {
        //window上默认的目录:C:\Users\25682\store\commitlog
        File dir = new File(this.storePath);
        //上面目录下子文件
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {
                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, please check it manually");
                    return false;
                }
                try {
                    //把每一个文件映射成MappedFile对象,方便读取
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                    //此时wrotePosition设置的为mappedFileSize,不准确
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }
        return true;
    }


此时CommitLog下的MappedFile的wrotePosition设置为mappedFileSize,但是最后这个MappedFile的wrotePosition还不对,因此下面需要修改


确定Commitlog要写的位置


DefaultMessageStore###recover


private void recover(final boolean lastExitOK) {
        //从ConsumeQueue中获取最大的物理偏移量
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
        if (lastExitOK) {
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            //
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }
        this.recoverTopicQueueTable();
    }


DefaultMessageStore###recoverConsumeQueue:获取每一个主题里每一个队列里的最大commitlog偏移量


private long recoverConsumeQueue() {
        long maxPhysicOffset = -1;
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                logic.recover();
                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                    maxPhysicOffset = logic.getMaxPhysicOffset();
                }
            }
        }
        return maxPhysicOffset;
    }


CommitLog###recoverAbnormally


public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            int index = mappedFiles.size() - 1;
            //获取最后一个CommitLog的MapperFile
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
            }
            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                //不断从MapperFile中根据CommitLog的数据单元格式读取数据,当读取到数据为0时,跳出循环,说明该位置为下个需要写的位置
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                if (dispatchRequest.isSuccess()) {
                    // Normal data
                    if (size > 0) {
                        mappedFileOffset += size;
                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                                this.defaultMessageStore.doDispatch(dispatchRequest);
                            }
                        } else {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    }
                    else if (size == 0) {
                        index++;
                        if (index >= mappedFiles.size()) {
                            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                            break;
                        } else {
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next physics file, " + mappedFile.getFileName());
                        }
                    }
                } else {
                    log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                    break;
                }
            }
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
             //该位置为真正要插入的位置,所以修正上面的设置的错误的wrotePosition
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        }
        // Commitlog case files are deleted
        else {
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }


结论


CommitLog一开始是把wrotePosition设置为CommitLog文件的大小,这样只有最后一个CommitLog的wrotePosition的数据是不正确的,所以后面在确定最后一个CommitLog的wrotePosition的时候是通过读取CommitLog文件里的数据来确定wrotePosition位置的,因为CommitLog里前四个字节代表这条消息的大小,这样我读取前四个字节以后就可以读取这一条数据,然后以此类推,当读取消息的大小为0时,代表此处没有消息,则确定wrotePosition的位置。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 缓存 物联网
MQTT常见问题之MQTT发送消息到阿里云服务器被拒如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
消息中间件 Java Apache
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
785 0
|
5天前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
6天前
|
消息中间件 RocketMQ
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
30 1
|
6天前
|
消息中间件 RocketMQ
在RocketMQ 5.1.0版本中,当Broker的`enableControllerMode`配置为true时
在RocketMQ 5.1.0版本中,当Broker的`enableControllerMode`配置为true时
121 1
|
6天前
|
存储 缓存 物联网
MQTT常见问题之MQTT发送消息过多内存不够处理不过来如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何发送消息
3分钟白话RocketMQ系列—— 如何发送消息
155 0
|
6天前
|
消息中间件 Oracle Java
【RocketMq】Broker 启动脚本分析
【RocketMq】Broker 启动脚本分析
39 0
|
6月前
|
安全 数据安全/隐私保护
如何为 Mosquitto MQTT Broker 配置 MQTT TLS 和基于证书的授权
如何为 Mosquitto MQTT Broker 配置 MQTT TLS 和基于证书的授权
546 1
|
8月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
123 0