RocketMQ入门到入土(五)消息持久化存储源码解析(下)

简介: RocketMQ入门到入土(五)消息持久化存储源码解析(下)

5、submitFlushRequest

再次回到【2、commitLog.asyncPutMessage】的submitFlushRequest方法,因为之前的方法是将数据已经写到ByteBuffer缓冲区里了,下一步也就是我们现在这一步就要刷盘了。


public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
                                                              MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                                                this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 异步刷盘
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}


6、异步刷盘


class FlushRealTimeService extends FlushCommitLogService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
    // 每隔500ms刷一次盘
                if (flushCommitLogTimed) {
                    Thread.sleep(500);
                } else {
                    this.waitForRunning(500);
                }
                // 调用mappedFileQueue的flush方法
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            } catch (Throwable e) {
            }
        }
    }
}


可看出默认是每隔500毫秒刷一次盘


7、mappedFileQueue.flush


public boolean flush(final int flushLeastPages) {
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        // 真正的刷盘操作
        int offset = mappedFile.flush(flushLeastPages);
    }
}


8、mappedFile.flush


public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        try {
            if (writeBuffer != null || this.fileChannel.position() != 0) {
                // 刷盘   NIO
                this.fileChannel.force(false);
            } else {
    // 刷盘  NIO
                this.mappedByteBuffer.force();
            }
        } catch (Throwable e) {
            log.error("Error occurred when force data to disk.", e);
        }
    }
    return this.getFlushedPosition();
}

至此已经全部结束。


四、总结


面试被问:Broker收到消息后怎么持久化的?


回答者:有两种方式:同步和异步。一般选择异步,同步效率低,但是更可靠。消息存储大致原理是:


核心类MappedFile对应的是每个commitlog文件,MappedFileQueue相当于文件夹,管理所有的文件,还有一个管理者CommitLog对象,他负责提供一些操作。具体的是Broker端拿到消息后先将消息、topic、queue等内容存到ByteBuffer里,然后去持久化到commitlog文件中。commitlog文件大小为1G,超出大小会新创建commitlog文件来存储,采取的nio方式。


五、补充:同步/异步刷盘


1、关键类


类名 描述 刷盘性能
CommitRealTimeService 异步刷盘 &&开启字节缓冲区 最高
FlushRealTimeService 异步刷盘&&关闭内存字节缓冲区 较高
GroupCommitService 同步刷盘,刷完盘才会返回消息写入成功 最低


2、图解


image.png


3、同步刷盘

3.1、源码


// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    // 同步刷盘service -> GroupCommitService
    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    if (messageExt.isWaitStoreMsgOK()) {
        // 数据准备
        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                                 this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
        // 将数据对象放到requestsWrite里
        service.putRequest(request);
        return request.future();
    } else {
        service.wakeup();
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}


putRequest


public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    // 这里很关键!!!,给他设置成true。然后计数器-1。下面run方法的时候才会进行交换数据且return
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}


run


public void run() {
    while (!this.isStopped()) {
        try {
            // 是同步还是异步的关键方法,也就是说组不阻塞全看这里。
            this.waitForRunning(10);
            // 真正的刷盘逻辑
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}


waitForRunning


protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
// 其实就是CountDownLatch
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected void waitForRunning(long interval) {
    // 如果是true,且给他改成false成功的话,则onWaitEnd()且return,但是默认是false,也就是默认情况下这个if不会进。
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }
    //entry to wait
    waitPoint.reset();
    try {
        // 等待,默认值是1,也就是waitPoint.countDown()一次后就会激活这里。
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 给状态值设置成false
        hasNotified.set(false);
        this.onWaitEnd();
    }
}


3.2、总结


总结下同步刷盘的主要流程:


核心类是GroupCommitService,核心方法 是waitForRunning。


  • 先调用putRequest方法将hasNotified变为true,且进行notify,也就是waitPoint.countDown()
  • 其次是run方法里的waitForRunning()waitForRunning()判断hasNotified是不是true,是true则交换数据然后return掉,也就是不进行await阻塞,直接return。
  • 最后上一步return了,没有阻塞,那么顺理成章的调用doCommit进行真正意义的刷盘。

4、异步刷盘

4.1、源码


核心类是:FlushRealTimeService


// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    flushCommitLogService.wakeup();
} else  {
    commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);


run


// {@link org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run()}
class FlushRealTimeService extends FlushCommitLogService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
    // 每隔500ms刷一次盘
                if (flushCommitLogTimed) {
                    Thread.sleep(500);
                } else {
                    // 根上面同步刷盘调用的是同一个方法,区别在于这里没有将hasNotified变为true,也就是还是默认的false,那么waitForRunning方法内部的第一个判断就不会走,就不会return掉,就会进行下面的await方法阻塞,默认阻塞时间是500毫秒。也就是默认500ms刷一次盘。
                    this.waitForRunning(500);
                }
                // 调用mappedFileQueue的flush方法
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            } catch (Throwable e) {
            }
        }
    }
}


4.2、总结


核心类#方法:FlushRealTimeService#run()


  • 判断flushCommitLogTimed是不是true,默认false,是true则直接sleep(500ms)然后进行mappedFileQueue.flush()刷盘。
  • 若是false,则进入waitForRunning(500),这里是和同步刷盘的区别关键所在,同步刷盘之前将hasNotified变为true了,所以直接一套小连招:return+doCommit了 ,异步这里直接调用的waitForRunning(500),在这之前没任何对hasNotified的操作,所以不会return,而是会继续走下面的waitPoint.await(500, TimeUnit.MILLISECONDS);进行阻塞500毫秒,500毫秒后自动唤醒然后进行flush刷盘。也就是异步刷盘的话默认500ms刷盘一次。


END

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1天前
|
Linux 网络安全 Windows
网络安全笔记-day8,DHCP部署_dhcp搭建部署,源码解析
网络安全笔记-day8,DHCP部署_dhcp搭建部署,源码解析
|
2天前
HuggingFace Tranformers 源码解析(4)
HuggingFace Tranformers 源码解析
6 0
|
2天前
HuggingFace Tranformers 源码解析(3)
HuggingFace Tranformers 源码解析
6 0
|
2天前
|
开发工具 git
HuggingFace Tranformers 源码解析(2)
HuggingFace Tranformers 源码解析
6 0
|
2天前
|
并行计算
HuggingFace Tranformers 源码解析(1)
HuggingFace Tranformers 源码解析
8 0
|
4天前
PandasTA 源码解析(二十三)
PandasTA 源码解析(二十三)
41 0
|
4天前
PandasTA 源码解析(二十二)(3)
PandasTA 源码解析(二十二)
34 0
|
4天前
PandasTA 源码解析(二十二)(2)
PandasTA 源码解析(二十二)
41 2
|
4天前
PandasTA 源码解析(二十二)(1)
PandasTA 源码解析(二十二)
33 0
|
4天前
PandasTA 源码解析(二十一)(4)
PandasTA 源码解析(二十一)
24 1

推荐镜像

更多