【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

简介: 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

承接【【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)】

pullBlockIfNotFound方法

通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法。

java

复制代码

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}

接下面我来分析DefaultMQPullConsumerImpl.pullSyncImpl源码实现如下:

pullSyncImpl方法的定义

java

复制代码

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
参数说明
  • MessageQueue mq:需要进行拉去的消息队列。
  • String subExpression:tag的标签值
  • long offset:消息数据偏移量
  • int maxNums:拉去的消息的最大数量
  • boolean block:是否进行阻塞拉去
  • long timeout:拉取数据的超时时间
参数数据校验

如下所示,校验对应的MessageQueue对象、offset偏移量和拉取的最大数据量是否合法。

java

复制代码

this.makeSureStateOK();
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
检查MessageQueue对象的topic是否存在

检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData> 变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression) 方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*"。

java

复制代码

this.subscriptionAutomatically(mq.getTopic());
构建标志位,逻辑或运算|=

java

复制代码

int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
构建SubscriptionData对象(buildSubscriptionData)

请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回。

java

复制代码

SubscriptionData subscriptionData;
try {
            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                mq.getTopic(), subExpression);
        } catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
、}
从broker中拉取消息(pullAPIWrapper.pullKernelImpl)

调用PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法从Broker拉取消息内容。

java

复制代码

long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            0L,
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );

对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中。

pullKernelImpl底层调用机制

调用PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法对拉取消息的响应结果进行处理,主要是消息反序列化。

java

复制代码

public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }
        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            // 设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
            return pullResult;
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
  • 看到首先代码调用MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) 方法查找Broker地址,其中onlyThisBroker=false,表示若指定的brokerId未获取到地址则获取其他BrokerId的地址也行。
  • 方法中根据brokerName和brokerId参数从MQClientInstance.brokerAddrTable->ConcurrentHashMap<, HashMap变量中获取对应的Broker地址,若获取不到则从brokerName下面的Map列表中找其他地址返回即可。
  • 判断是否为空,若在上一步未获取到Broker地址,topic参数调用MQClientInstance.updateTopicRouteInfoFromNameServer(String topic)方法,然后在执行操作MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker) ,直到获取到Broker地址为止;
  • 根据topic参数值从MQClientInstance.topicRouteTable: ConcurrentHashMapTopicRouteData>变量中获取TopicRouteData对象,
  • Broker地址为参数从该TopicRouteData对象的filterServerTable:HashMap变量中获取该Broker下面的所有Filter服务器地址列表;
  • 若该地址列表不为空,则随机选择一个Filter服务器地址返回;否则向调用层抛出异常,该pullKernelImpl方法结束;
获取对应的broker服务

若获取的Broker地址是备用Broker,则将标记位sysFlag的第1个字节置为0,即在消费完之后不提交 消费进度;

检查标记位sysFlag的第4个字节(即SubscriptionData. classFilterMode)是否为1;若等于1,则调用PullAPIWrapper.computPullFromWhichFilterServer(String topic, String brokerAddr)方法获取Filter服务器地址。大致逻辑如下:

java

复制代码

int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
初始化PullMessageRequestHeader对象

调用MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法向Broker地址或者Filter地址发送PULL_MESSAGE请求信息。

构建PullMessageRequestHeader对象,其中queueOffset变量值等于入参offset,设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回。

java

复制代码

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

获取的Filter服务器发送PULL_MESSAGE请求信息,否则向Broker发送PULL_MESSAGE请求信息。

pullMessage方法处理

java

复制代码

PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);
  return pullResult;
代码实现机制(同步+异步)

在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。

java

复制代码

public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }

无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:

  1. 发送失败后直接删掉responseTable变量中的相应记录;
  2. 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从responseTable中查找ResponseFuture对象,并设置该对象的responseCommand变量。若是同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;若是异步发送会调用ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;
pullMessageSync调用操作(对于同步拉取)

同步拉取方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法。大致步骤如下:

  1. 调用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:

java

复制代码

@Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
  • 获取Broker地址的Channel信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量;若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中,便于下次使用;
  • 若NettyRemotingClient.rpcHook:RPCHook变量不为空(该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值),则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
NettyRemotingAbstract.invokeSyncImpl(同步请求)

调用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:

  1. 使用请求的序列号(opaue)、超时时间初始化ResponseFuture对象;并将该ResponseFuture对象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap变量中;
  2. 调用Channel.writeAndFlush(Object msg)方法将请求对象RemotingCommand发送给Broker;然后调用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了ChannelFutureListener接口的operationComplete方法,在发送完成之后回调该监听类的operationComplete方法,在该方法中,首先调用ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置ResponseFuture对象的sendRequestOK等于true并退出此回调方法等待响应结果;若不成功则置ResponseFuture对象的sendRequestOK等于false,然后从NettyRemotingAbstract.responseTable中删除此请求序列号(opaue)的记录,置ResponseFuture对象的responseCommand等于null,并唤醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
  3. 调用ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息或者超时的情况下会唤醒该方法返回ResponseFuture.responseCommand变量值;
  4. 若上一步返回的responseCommand值为null,则抛出异常:若ResponseFuture.sendRequestOK为true,则抛出RemotingTimeoutException异常,否则抛出RemotingSendRequestException异常;
  5. 若上一步返回的responseCommand值不为null,则返回responseCommand变量值;
结果返回
  1. 若NettyRemotingClient.rpcHook: RPCHook变量不为空,则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;

2、以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下:

  • 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND;
  • 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG;
  • 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG;
  • 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;
定时请求响应结果列表

在NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔1秒定期扫描responseTable列表,遍历该列表中的ResponseFuture对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback()方法,并将该对象从responseTable列表中删除;

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
28 0
|
15天前
|
消息中间件 传感器 网络协议
阿里云MQTT简介和使用流程
以下是内容的摘要: 该文主要介绍了在阿里云上搭建 MQTT 服务器的步骤。首先,需要注册阿里云账号并进行实名认证。然后,购买阿里云 MQTT 实例,选择合适的类型、地域、连接和消息限制。接着,创建产品和设备,命名并上线,获取 MQTT 连接的相关信息,包括 ProductKey、DeviceName 和 DeviceSecret。通过提供的 MQTT.fx 工具,设置 MQTT 客户端连接参数,包括 Broker 地址、端口、用户名和密码。最后,使用 MQTT.fx 测试连接,实现数据的上报和接收,验证 MQTT 服务器的配置是否成功。
|
15天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
34 0
RabbitMQ入门指南(九):消费者可靠性
|
1月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
4月前
|
消息中间件 NoSQL 数据库
一文讲透消息队列RocketMQ实现消费幂等
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
一文讲透消息队列RocketMQ实现消费幂等
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
76 0
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
46 0
|
3月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
123 0
|
23天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
17 0