开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):同步刷盘分析】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12488
同步刷盘分析
同步刷盘
RocketMQ 的存储是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。再根据刷盘的策略来决定到底是同步刷盘还是异步刷盘。
消息追加的入口在 CommitLog 中有个 putMessages 里。前面做的都是将数据追加到内存里去,再根据不同的刷盘策略,再去将数据刷到磁盘当中。
handleDiskFlush(result,putMessageResult,messageExtBatch);
//刷盘入口
进入到putMessageResult。先判断 MessageStoreConfig 当中刷盘的策略。SYNC_FLUSH 是同步刷盘。
同步刷盘的意思是消息追加到内存后,立即将数据刷写到磁盘文件。同步刷盘的逻辑是这样的。
finalGroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//拿到同步刷盘的服务 GroupCommitService ,
GroupCommitRequest request = new
GroupcommitRequest(nextOffset:result.getWroteOffset() +
result.getWroteBytes());
//创建刷盘的请求对象 GroupCommitRequest
service.putRequest(request);
//将请求对象提交到 service ,在 service 当中进行了处理
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// waitForFlush 这个请求等待刷盘的结果。
private int syncFlushTimeout =1000 * 5;
// 等待超时时间 getSyncFlushTimeout 为5秒,就是会同步阻塞5秒。
把 request 提交到 service.putRequest 当中, service 用了等待唤醒机制去进行一个处理。
public synchronized void putRequest(final GroupCommitRequest request){
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(expect:false,update:true)){
waitPoint.countDown();
//唤醒线程notify
}
这里首先是把这个请求去附着到一个集合当中,
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<~>();
//写集合
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<~>();
//读集合
首先将 request 对象放到 requestsWrite 当中去,然后在处理的时候,为了提高读写的效率,将数据会复制到 requestsRead 中去,然后前后分开去处理。我们将请求对象提交到 requestWrite 这个集合当中,这里面唤醒了一下这个线程 notify 。
线程唤醒之后看run 方法,run 方法最核心地方在 doCommit ,处理请求的时候,会间隔10毫秒去处理一次,有请求就会去处理。
while (!this.isStopped()) {
try {
this.waitForRunning( interval: 10);//处理请求间隔10毫秒
this.doCommit(); //run 方法核心
} catch (Exception e) {
CommitLog.Log.warn(this.getserviceName() + " service has exception. ", e);
}
}
在处理的时候,把 While循环开启,然后在下边去做一个 swapRequests 方式
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
swapRequests 这个方式就是在交换读集合和写集合。
最终再去处理的时候是从 requestsRead 集合当中去取。
所以从 run 方法中可以看到,doCommit 被触发了之后,就会取这个数据,如果 requestsRead 这里没有数据,就会去阻塞。
一旦有请求到来之后,它先将两个集合的数据进行一个交换,然后再让 doCommit 去处理。
synchronized (this) {
this.swapRequests();
}
如上图,首先去遍历当前集合 req 当中所有的请求对象
然后还有一个遍历 flushOK 会遍历两次,
For (int i = 0;i < 2 && !flushOK; i++){
flushOK = CommitLog.this.MappedFileQueue.getFlushedWhere() >= rep.getNextOffset();
If (!flushOK){
CommitLog.this.mappedFileQueue.flush(flushLeastPages:0);
如果文件数据存在跨文件存储,那么它要把前一个文件给它去拿到,再把后一个文件也拿到,然后这两个文件拼接起来之后才是一个完整的消息数据。这里主要考虑到消息的跨文件的问题。
然后就进行刷盘的处理
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
//进行判断
刷完盘之后会进行一个判断,判断刷盘的位置 getFlushedWhere 如果大于当前写的位置 req.getNextOffset ,那就代表当前刷盘成功,之后正常返回。
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
//更新刷盘存储点
最终更新到磁盘中如下图所示位置。