RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
1 Netty客户端/服务端运行
1.1 Broker端Netty服务器
- BrokerController引导启动Netty服务器
创建NettyRemotingServer对象并调用start()方法来启动Netty服务器
public class BrokerController {
protected void initializeRemotingServer() {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
...
}
protected void startBasicService() throws Exception {
...
if (this.remotingServer != null) {
this.remotingServer.start();
}
...
}
}
- Netty服务端启动
这里就是我们熟悉的Netty的ServerBootstrap服务端引导类,通过设置EventLoopGroup然后绑定端口接着添加一系列的ChannelHandler启动服务器;
对于RocketMQ来说主要的Handler是NettyServerHandler这个处理类,它主要负责接收客户端请求并进行处理。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
// sharable handlers
private NettyServerHandler serverHandler;
@Override
public void start() {
...
// 用于处理客户端请求命令
serverHandler = new NettyServerHandler();
...
serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
...
.addLast(defaultEventExecutorGroup, ..., serverHandler);
}
});
try {
ChannelFuture sync = serverBootstrap.bind().sync();
...
} catch (Exception e) {
...
}
...
}
}
1.2 Producer/Consumer端Netty客户端
Producer和Consumer的启动最终都会调用mQClientFactory.start()来创建Netty客户端
// Producer
public class DefaultMQProducerImpl implements MQProducerInner {
private MQClientInstance mQClientFactory;
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
...
// 这里创建Netty客户端
mQClientFactory.start();
...
case RUNNING:
...
default:
break;
}
...
}
}
// Consumer
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MQClientInstance mQClientFactory;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
...
// 这里创建Netty客户端
mQClientFactory.start();
...
case RUNNING:
...
default:
break;
}
...
}
}
- 创建Netty客户端对象
执行mQClientFactory.start() --> this.mQClientAPIImpl.start() --> this.remotingClient.start(),最终NettyRemotingClient对象调用start()方法来创建Netty客户端;
Netty客户端由Bootstrap引导程序创建,之后请求/响应通过Netty客户端处理。
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@Override
public void start() {
this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
...
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
...
ch.pipeline().addLast(..., new NettyClientHandler());
}
});
...
}
}
2 消息的生产及保存
2.1 Producer生产消息到Broker
我们调用producer.send发送消息时,程序会使用RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE)把消息封装为自定义的通信协议RemotingCommand,
之后NettyRemotingClient会找到Broker地址并建立连接生成Channel对象调用writeAndFlush方法将请求(RemotingCommand)发送到Netty服务器
- 使用producer.send发送同步消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.start();
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
}
}
- 封装为RemotingCommand消息协议进一步执行发送逻辑
public class MQClientAPIImpl implements NameServerUpdateCallback {
public SendResult sendMessage(final Message msg, final SendMessageRequestHeader requestHeader, ...) {
// 构建请求命令 RequestCode.SEND_MESSAGE
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
request.setBody(msg.getBody());
switch (communicationMode) {
...
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
...
}
return null;
}
}
- 调用与服务端建立的Channel的writeAndFlush方法将请求RemotingCommand发送到Broker
public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(Channel channel, RemotingCommand request) {
...
try {
...
// 向服务端Broker通道发送消息
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
...
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
...
return responseCommand;
}
...
}
}
2.2 Broker接收消息并保存
在开始介绍说Broker中NettyServer的启动时会添加NettyServerHandler一个处理器,这个handler负责处理client发过来的请求指令。
上面当客户端Producer发送RemotingCommand(RequestCode.SEND_MESSAGE)这个指令的请求时,Broker收到请求后通过RequestCode
找到对应的SendMessageProcessor处理器执行processRequest方法去处理消息接收后的逻辑。
- SendMessageProcessor处理器来接收消息调用MessageStore保存消息
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
...
response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
...
}
public RemotingCommand sendMessage(..., final RemotingCommand request, ...) throws RemotingCommandException {
...
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
...
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
...
}
}
- MessageStore保存消息到CommitLog中
public class DefaultMessageStore implements MessageStore {
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
return waitForPutResult(asyncPutMessages(messageExtBatch));
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
...
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
...
}
}
public class CommitLog implements Swappable {
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
...
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
...
}
}
3 消息的拉取及消费
RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。
3.1 Consumer向Broker发送拉取请求
创建Consumer实例,订阅Topic并注册MessageListener后调用start方法启动程序;
接着开启一个PullMessageService任务去向Broker发送消息拉取请求,通过DefaultMQPushConsumerImpl.pullMessage方法设置请求回调逻辑(如:获取到消息则使用MessageListener去消费),
接着继续执行MQClientAPIImpl().pullMessage将请求信息封装为RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE),最终由NettyRemotingClient发送请求到Broker。
- 创建DefaultMQPushConsumer调用start()启动消费者程序,PullMessageService任务开启
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
public class MQClientInstance {
public void start() throws MQClientException {
...
// Start pull service
this.pullMessageService.start();
...
}
}
- 这里会设置回调逻辑等并继续调用底层实现发起请求,开始注册的MessageListener会在这里调用的
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
switch (pullResult.getPullStatus()) {
// 获取到消息
case FOUND:
...
// 这个里面会执行注册的MessageListener
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), ...);
...
// 继续拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
...
...
}
}
}
...
};
...
try {
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), ..., pullRequest.getNextOffset(), ..., pullCallback);
} catch (Exception e) {
...
}
}
}
上面执行到pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl().pullMessage来封装请求报文最终交由NettyRemotingClient.invokeSyncImpl真正发出请求。
- 封装请求报文RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE)
public class MQClientAPIImpl implements NameServerUpdateCallback {
public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
...
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
...
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
}
- RemotingClient调用channel.writeAndFlush(request)发出拉取请求
public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
...
try {
...
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
...
});
...
}
...
}
}
3.2 Broker接收拉取请求匹配返回消息
同样Broker中NettyServerHandler收到RemotingCommand(RequestCode.PULL_MESSAGE)这个指令找到PullMessageProcessor调用processRequest方法去处理。
- PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class PullMessageProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, ...) {
...
messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
.thenApply(result -> {
...
})
// 写回客户端
.thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
...
}
}
- PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class DefaultMessageStore implements MessageStore {
@Override
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, ...) {
...
GetMessageResult getResult = new GetMessageResult();
...
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
...
getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
...
return getResult;
}
}
- 从commitLog中获取消息
public class CommitLog implements Swappable {
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(pos, size);
if (null != selectMappedBufferResult) {
selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
return selectMappedBufferResult;
}
}
return null;
}
}
最后
至此我们把RocketMQ中Broker与生产者/消费者基于Netty简单的通信调用链路讲完了,大家有什么问题可以下面留言哦,一起学习进步啊。
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq