一、原理
1、消息存在哪了?
消息持久化的地方其实是磁盘上,在如下目录里的commitlog文件夹里。
/root/store/commitlog
源码如下:
// {@link org.apache.rocketmq.store.config.MessageStoreConfig} // 数据存储根目录 private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; // commitlog目录 private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; // 每个commitlog文件大小为1GB,超过1GB则创建新的commitlog文件 private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
比如验证下:
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# pwd /root/store/commitlog [root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# ll -h total 400K -rw-r--r-- 1 root root 1.0G Jun 30 18:21 00000000000000000000 [root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]#
可以清晰的看到文件大小是1.0G,超过1.0G再写入消息的话会自动创建新的commitlog文件。
2、关键类解释
2.1、MappedFile
对应的是commitlog文件,比如上面的00000000000000000000
文件。
2.2、MappedFileQueue
是MappedFile
所在的文件夹,对 MappedFile
进行封装成文件队列。
2.3、CommitLog
针对 MappedFileQueue
的封装使用。
二、Broker接收消息
1、调用链
BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 new NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() -》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()
2、processRequest
SendMessageProcessor.processRequest()
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { // 调用asyncProcessRequest response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; }
3、asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { // 表示消费者发送的消息,发送者消费失败会重新发回队列进行消息重试 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: // 解析header,也就是我们Producer发送过来的消息都在request里,给他解析到SendMessageRequestHeader对象里去。 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); // 将解析好的参数放到SendMessageContext对象里 this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { // 批处理消息用 return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 非批处理,我们这里介绍的核心。 return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
4、asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); // 拼凑message对象 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); CompletableFuture<PutMessageResult> putMessageResult = null; Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); // 真正接收消息的方法 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
至此我们的消息接收完成了,都封装到了MessageExtBrokerInner对象里。
三、Broker消息存储(持久化)
1、asyncPutMessage
接着上步骤的asyncSendMessage继续看
@Override public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { ...... }); return putResultFuture; }
2、commitLog.asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // 获取最后一个文件,MappedFile就是commitlog目录下的那个0000000000文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); try { // 追加数据到commitlog result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { ...... } // 将内存的数据持久化到磁盘 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg); } }
3、appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { // 将消息写到内存 return cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); }
4、doAppend
@Override public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); return result; }
这一步其实就已经把消息保存到缓冲区里了,也就是msgStoreItemMemory,这里采取的NIO。
private final ByteBuffer msgStoreItemMemory;