RocketMQ入门到入土(五)消息持久化存储源码解析(上)

简介: RocketMQ入门到入土(五)消息持久化存储源码解析(上)

一、原理

1、消息存在哪了?

消息持久化的地方其实是磁盘上,在如下目录里的commitlog文件夹里。


/root/store/commitlog


源码如下:


// {@link org.apache.rocketmq.store.config.MessageStoreConfig}
// 数据存储根目录
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
// commitlog目录
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";
// 每个commitlog文件大小为1GB,超过1GB则创建新的commitlog文件
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;


比如验证下:


[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# pwd
/root/store/commitlog
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# ll -h
total 400K
-rw-r--r-- 1 root root 1.0G Jun 30 18:21 00000000000000000000
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]#


可以清晰的看到文件大小是1.0G,超过1.0G再写入消息的话会自动创建新的commitlog文件。

2、关键类解释

2.1、MappedFile


对应的是commitlog文件,比如上面的00000000000000000000文件。


2.2、MappedFileQueue


MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列。


2.3、CommitLog


针对 MappedFileQueue 的封装使用。


二、Broker接收消息

1、调用链

BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 new NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() 
-》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()

2、processRequest

SendMessageProcessor.processRequest()


@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = null;
    try {
        // 调用asyncProcessRequest
        response = asyncProcessRequest(ctx, request).get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("process SendMessage error, request : " + request.toString(), e);
    }
    return response;
}

3、asyncProcessRequest

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
    final SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        // 表示消费者发送的消息,发送者消费失败会重新发回队列进行消息重试
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.asyncConsumerSendMsgBack(ctx, request);
        default:
            // 解析header,也就是我们Producer发送过来的消息都在request里,给他解析到SendMessageRequestHeader对象里去。
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return CompletableFuture.completedFuture(null);
            }
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // 将解析好的参数放到SendMessageContext对象里
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            if (requestHeader.isBatch()) {
                // 批处理消息用
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 非批处理,我们这里介绍的核心。
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
            }
    }
}

4、asyncSendMessage

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
    final byte[] body = request.getBody();
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    // 拼凑message对象
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    CompletableFuture<PutMessageResult> putMessageResult = null;
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 真正接收消息的方法
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}


至此我们的消息接收完成了,都封装到了MessageExtBrokerInner对象里。


三、Broker消息存储(持久化)

1、asyncPutMessage

接着上步骤的asyncSendMessage继续看


@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    putResultFuture.thenAccept((result) -> {
        ......
    });
    return putResultFuture;
}

2、commitLog.asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 获取最后一个文件,MappedFile就是commitlog目录下的那个0000000000文件
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    try {
        // 追加数据到commitlog
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            ......
        }
        // 将内存的数据持久化到磁盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    }
}

3、appendMessagesInner

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    // 将消息写到内存
    return cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
}

4、doAppend

@Override
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                    final MessageExtBrokerInner msgInner) {
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(bornHostHolder, bornHostLength);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(storeHostHolder, storeHostLength);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    return result;
}



这一步其实就已经把消息保存到缓冲区里了,也就是msgStoreItemMemory,这里采取的NIO。


private final ByteBuffer msgStoreItemMemory;
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
PandasTA 源码解析(二十三)
PandasTA 源码解析(二十三)
7 0
|
2天前
PandasTA 源码解析(二十二)(3)
PandasTA 源码解析(二十二)
5 0
|
2天前
PandasTA 源码解析(二十二)(2)
PandasTA 源码解析(二十二)
9 2
|
2天前
PandasTA 源码解析(二十二)(1)
PandasTA 源码解析(二十二)
7 0
|
2天前
PandasTA 源码解析(二十一)(4)
PandasTA 源码解析(二十一)
7 1
|
2天前
PandasTA 源码解析(二十一)(3)
PandasTA 源码解析(二十一)
6 0
|
2天前
PandasTA 源码解析(二十一)(2)
PandasTA 源码解析(二十一)
7 1
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
628 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
316 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
473 0
RocketMQ Schema——让消息成为流动的结构化数据

热门文章

最新文章

推荐镜像

更多