RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: **RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。

RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Netty客户端/服务端运行

20231020-01.png

1.1 Broker端Netty服务器

  • BrokerController引导启动Netty服务器

创建NettyRemotingServer对象并调用start()方法来启动Netty服务器

public class BrokerController {
   
   
    protected void initializeRemotingServer() {
   
   
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        ...
    }
    protected void startBasicService() throws Exception {
   
   
        ...
        if (this.remotingServer != null) {
   
   
            this.remotingServer.start();
        }
        ...
    }
}
  • Netty服务端启动

这里就是我们熟悉的Netty的ServerBootstrap服务端引导类,通过设置EventLoopGroup然后绑定端口接着添加一系列的ChannelHandler启动服务器;
对于RocketMQ来说主要的Handler是NettyServerHandler这个处理类,它主要负责接收客户端请求并进行处理。

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
   
   
    // sharable handlers
    private NettyServerHandler serverHandler;

    @Override
    public void start() {
   
   
        ...
        // 用于处理客户端请求命令
        serverHandler = new NettyServerHandler();
        ...
        serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                ...
                .childHandler(new ChannelInitializer<SocketChannel>() {
   
   
                    @Override
                    public void initChannel(SocketChannel ch) {
   
   
                        ch.pipeline()
                                ...
                                .addLast(defaultEventExecutorGroup, ..., serverHandler);
                    }
                });
        try {
   
   
            ChannelFuture sync = serverBootstrap.bind().sync();
            ...
        } catch (Exception e) {
   
   
            ...
        }
        ...
    }
}

1.2 Producer/Consumer端Netty客户端

Producer和Consumer的启动最终都会调用mQClientFactory.start()来创建Netty客户端

// Producer
public class DefaultMQProducerImpl implements MQProducerInner {
   
   

    private MQClientInstance mQClientFactory;

    public void start(final boolean startFactory) throws MQClientException {
   
   
        switch (this.serviceState) {
   
   
            case CREATE_JUST:
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
// Consumer
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   
   

    private MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
   
   
        switch (this.serviceState) {
   
   
            case CREATE_JUST:
                ...
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
  • 创建Netty客户端对象

执行mQClientFactory.start() --> this.mQClientAPIImpl.start() --> this.remotingClient.start(),最终NettyRemotingClient对象调用start()方法来创建Netty客户端;
Netty客户端由Bootstrap引导程序创建,之后请求/响应通过Netty客户端处理。

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
   
   
    @Override
    public void start() {
   
   
        this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                ...
                .handler(new ChannelInitializer<SocketChannel>() {
   
   
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
   
   
                        ...
                        ch.pipeline().addLast(..., new NettyClientHandler());
                    }
                });
        ...
    }
}

2 消息的生产及保存

20231020-02.png

2.1 Producer生产消息到Broker

我们调用producer.send发送消息时,程序会使用RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE)把消息封装为自定义的通信协议RemotingCommand,
之后NettyRemotingClient会找到Broker地址并建立连接生成Channel对象调用writeAndFlush方法将请求(RemotingCommand)发送到Netty服务器

  • 使用producer.send发送同步消息
public class Producer {
   
   
    public static void main(String[] args) throws MQClientException, InterruptedException {
   
   
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.start();
        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        SendResult sendResult = producer.send(msg);
    }
}
  • 封装为RemotingCommand消息协议进一步执行发送逻辑
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
   
    public SendResult sendMessage(final Message msg, final SendMessageRequestHeader requestHeader, ...) {
   
   
        // 构建请求命令 RequestCode.SEND_MESSAGE
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        request.setBody(msg.getBody());
        switch (communicationMode) {
   
   
            ...
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            ...
        }
        return null;
    }
}
  • 调用与服务端建立的Channel的writeAndFlush方法将请求RemotingCommand发送到Broker
public abstract class NettyRemotingAbstract {
   
   
    public RemotingCommand invokeSyncImpl(Channel channel, RemotingCommand request) {
   
   
        ...
        try {
   
   
            ...
            // 向服务端Broker通道发送消息
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
   
                ...
            });
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            ...
            return responseCommand;
        } 
        ...
    }
}

2.2 Broker接收消息并保存

在开始介绍说Broker中NettyServer的启动时会添加NettyServerHandler一个处理器,这个handler负责处理client发过来的请求指令。

上面当客户端Producer发送RemotingCommand(RequestCode.SEND_MESSAGE)这个指令的请求时,Broker收到请求后通过RequestCode
找到对应的SendMessageProcessor处理器执行processRequest方法去处理消息接收后的逻辑。

  • SendMessageProcessor处理器来接收消息调用MessageStore保存消息
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
   
   

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
   
   
        ...
        response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
        ...
    }

    public RemotingCommand sendMessage(...final RemotingCommand request, ...) throws RemotingCommandException {
   
   
        ...
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        ...
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        ...
    }
}
  • MessageStore保存消息到CommitLog中
public class DefaultMessageStore implements MessageStore {
   
   

    @Override
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
   
   
        return waitForPutResult(asyncPutMessages(messageExtBatch));
    }

    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
   
   
        ...
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
        ...
    }
}
public class CommitLog implements Swappable {
   
   
    public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
   
   
        ...
        result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
        ...
    }
}

3 消息的拉取及消费

20231020-03.pngRocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的
,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。

3.1 Consumer向Broker发送拉取请求

创建Consumer实例,订阅Topic并注册MessageListener后调用start方法启动程序;

接着开启一个PullMessageService任务去向Broker发送消息拉取请求,通过DefaultMQPushConsumerImpl.pullMessage方法设置请求回调逻辑(如:获取到消息则使用MessageListener去消费),
接着继续执行MQClientAPIImpl().pullMessage将请求信息封装为RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE),最终由NettyRemotingClient发送请求到Broker。

  • 创建DefaultMQPushConsumer调用start()启动消费者程序,PullMessageService任务开启
public class Consumer {
   
   

    public static void main(String[] args) throws MQClientException {
   
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

public class MQClientInstance {
   
   

    public void start() throws MQClientException {
   
   
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
  • 这里会设置回调逻辑等并继续调用底层实现发起请求,开始注册的MessageListener会在这里调用的
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   
   

    public void pullMessage(final PullRequest pullRequest) {
   
   
        ...
        PullCallback pullCallback = new PullCallback() {
   
   
            @Override
            public void onSuccess(PullResult pullResult) {
   
   
                if (pullResult != null) {
   
   
                    switch (pullResult.getPullStatus()) {
   
   
                        // 获取到消息
                        case FOUND:
                            ...
                            // 这个里面会执行注册的MessageListener
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), ...);
                            ...
                            // 继续拉取消息
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            ...
                        ...
                    }
                }
            }
            ...
        };
        ...
        try {
   
   
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), ..., pullRequest.getNextOffset(), ..., pullCallback);
        } catch (Exception e) {
   
   
            ...
        }
    }
}

上面执行到pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl().pullMessage来封装请求报文最终交由NettyRemotingClient.invokeSyncImpl真正发出请求。

  • 封装请求报文RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE)
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
   
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
   
   
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
  • RemotingClient调用channel.writeAndFlush(request)发出拉取请求
public abstract class NettyRemotingAbstract {
   
   
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
   
   
        ...
        try {
   
   
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
   
                ...
            });
            ...
        } 
        ...
    }
}

3.2 Broker接收拉取请求匹配返回消息

同样Broker中NettyServerHandler收到RemotingCommand(RequestCode.PULL_MESSAGE)这个指令找到PullMessageProcessor调用processRequest方法去处理。

  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class PullMessageProcessor implements NettyRequestProcessor {
   
   
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, ...) {
   
   
        ...
        messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
                .thenApply(result -> {
   
   
                    ...
                })
                // 写回客户端
                .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
        ...
    }
}
  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class DefaultMessageStore implements MessageStore {
   
   
    @Override
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, ...) {
   
   
        ...
        GetMessageResult getResult = new GetMessageResult();
        ...
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
        ...
        return getResult;
    }
}
  • 从commitLog中获取消息
public class CommitLog implements Swappable {
   
   
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
   
   
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
   
   
            int pos = (int) (offset % mappedFileSize);
            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(pos, size);
            if (null != selectMappedBufferResult) {
   
   
                selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
                return selectMappedBufferResult;
            }
        }
        return null;
    }
}

最后

至此我们把RocketMQ中Broker与生产者/消费者基于Netty简单的通信调用链路讲完了,大家有什么问题可以下面留言哦,一起学习进步啊。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
5月前
|
消息中间件 Java 中间件
详解rocketMq通信模块&升级构想(上)
详解rocketMq通信模块&升级构想(上)
109 0
|
5月前
|
消息中间件 Java 应用服务中间件
详解rocketMq通信模块&升级构想(下)
详解rocketMq通信模块&升级构想(下)
196 0
详解rocketMq通信模块&升级构想(下)
|
5月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
173 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
4月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
38 0
|
6月前
|
存储 传感器 物联网
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
293 0
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
|
5月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
34 1
|
4月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
2月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1

相关产品

  • 云消息队列 MQ