【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

简介: 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

承接【【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)】

pullBlockIfNotFound方法

通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法。

java

复制代码

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

接下面我来分析DefaultMQPullConsumerImpl.pullSyncImpl源码实现如下:

pullSyncImpl方法的定义

java

复制代码

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
参数说明
  • MessageQueue mq:需要进行拉去的消息队列。
  • String subExpression:tag的标签值
  • long offset:消息数据偏移量
  • int maxNums:拉去的消息的最大数量
  • boolean block:是否进行阻塞拉去
  • long timeout:拉取数据的超时时间
参数数据校验

如下所示,校验对应的MessageQueue对象、offset偏移量和拉取的最大数据量是否合法。

java

复制代码

this.makeSureStateOK();
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
检查MessageQueue对象的topic是否存在

检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData> 变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression) 方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*"。

java

复制代码

this.subscriptionAutomatically(mq.getTopic());
构建标志位,逻辑或运算|=

java

复制代码

int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
构建SubscriptionData对象(buildSubscriptionData)

请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回。

java

复制代码

SubscriptionData subscriptionData;
try {
            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                mq.getTopic(), subExpression);
        } catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
、}
从broker中拉取消息(pullAPIWrapper.pullKernelImpl)

调用PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法从Broker拉取消息内容。

java

复制代码

long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            0L,
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );

对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中。

pullKernelImpl底层调用机制

调用PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法对拉取消息的响应结果进行处理,主要是消息反序列化。

java

复制代码

public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }
        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            // 设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
            return pullResult;
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
  • 看到首先代码调用MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) 方法查找Broker地址,其中onlyThisBroker=false,表示若指定的brokerId未获取到地址则获取其他BrokerId的地址也行。
  • 方法中根据brokerName和brokerId参数从MQClientInstance.brokerAddrTable->ConcurrentHashMap<, HashMap变量中获取对应的Broker地址,若获取不到则从brokerName下面的Map列表中找其他地址返回即可。
  • 判断是否为空,若在上一步未获取到Broker地址,topic参数调用MQClientInstance.updateTopicRouteInfoFromNameServer(String topic)方法,然后在执行操作MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) ,直到获取到Broker地址为止;
  • 根据topic参数值从MQClientInstance.topicRouteTable: ConcurrentHashMapTopicRouteData>变量中获取TopicRouteData对象,
  • Broker地址为参数从该TopicRouteData对象的filterServerTable:HashMap变量中获取该Broker下面的所有Filter服务器地址列表;
  • 若该地址列表不为空,则随机选择一个Filter服务器地址返回;否则向调用层抛出异常,该pullKernelImpl方法结束;
获取对应的broker服务

若获取的Broker地址是备用Broker,则将标记位sysFlag的第1个字节置为0,即在消费完之后不提交 消费进度;

检查标记位sysFlag的第4个字节(即SubscriptionData. classFilterMode)是否为1;若等于1,则调用PullAPIWrapper.computPullFromWhichFilterServer(String topic, String brokerAddr)方法获取Filter服务器地址。大致逻辑如下:

java

复制代码

int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
初始化PullMessageRequestHeader对象

调用MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法向Broker地址或者Filter地址发送PULL_MESSAGE请求信息。

构建PullMessageRequestHeader对象,其中queueOffset变量值等于入参offset,设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回。

java

复制代码

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

获取的Filter服务器发送PULL_MESSAGE请求信息,否则向Broker发送PULL_MESSAGE请求信息。

pullMessage方法处理

java

复制代码

PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
  return pullResult;
代码实现机制(同步+异步)

在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。

java

复制代码

public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }

无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:

  1. 发送失败后直接删掉responseTable变量中的相应记录;
  2. 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从responseTable中查找ResponseFuture对象,并设置该对象的responseCommand变量。若是同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;若是异步发送会调用ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;
pullMessageSync调用操作(对于同步拉取)

同步拉取方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法。大致步骤如下:

  1. 调用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:

java

复制代码

@Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
  • 获取Broker地址的Channel信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量;若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中,便于下次使用;
  • 若NettyRemotingClient.rpcHook:RPCHook变量不为空(该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值),则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
NettyRemotingAbstract.invokeSyncImpl(同步请求)

调用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:

  1. 使用请求的序列号(opaue)、超时时间初始化ResponseFuture对象;并将该ResponseFuture对象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap变量中;
  2. 调用Channel.writeAndFlush(Object msg)方法将请求对象RemotingCommand发送给Broker;然后调用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了ChannelFutureListener接口的operationComplete方法,在发送完成之后回调该监听类的operationComplete方法,在该方法中,首先调用ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置ResponseFuture对象的sendRequestOK等于true并退出此回调方法等待响应结果;若不成功则置ResponseFuture对象的sendRequestOK等于false,然后从NettyRemotingAbstract.responseTable中删除此请求序列号(opaue)的记录,置ResponseFuture对象的responseCommand等于null,并唤醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
  3. 调用ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息或者超时的情况下会唤醒该方法返回ResponseFuture.responseCommand变量值;
  4. 若上一步返回的responseCommand值为null,则抛出异常:若ResponseFuture.sendRequestOK为true,则抛出RemotingTimeoutException异常,否则抛出RemotingSendRequestException异常;
  5. 若上一步返回的responseCommand值不为null,则返回responseCommand变量值;
结果返回
  1. 若NettyRemotingClient.rpcHook: RPCHook变量不为空,则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;

2、以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下:

  • 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND;
  • 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG;
  • 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG;
  • 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;
定时请求响应结果列表

在NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔1秒定期扫描responseTable列表,遍历该列表中的ResponseFuture对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback()方法,并将该对象从responseTable列表中删除;

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
26天前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
59 12
|
26天前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
34 2
|
2月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
47 0
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
80 2
|
4月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
77 1
|
4月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
4月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
67 0
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
90 5