承接【【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)】
pullBlockIfNotFound方法
通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
方法获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
方法。
java
复制代码
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); }
接下面我来分析DefaultMQPullConsumerImpl.pullSyncImpl
源码实现如下:
pullSyncImpl方法的定义
java
复制代码
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
参数说明
- MessageQueue mq:需要进行拉去的消息队列。
- String subExpression:tag的标签值
- long offset:消息数据偏移量
- int maxNums:拉去的消息的最大数量
- boolean block:是否进行阻塞拉去
- long timeout:拉取数据的超时时间
参数数据校验
如下所示,校验对应的MessageQueue对象、offset偏移量和拉取的最大数据量是否合法。
java
复制代码
this.makeSureStateOK(); if (null == mq) { throw new MQClientException("mq is null", null); } if (offset < 0) { throw new MQClientException("offset < 0", null); } if (maxNums <= 0) { throw new MQClientException("maxNums <= 0", null); }
检查MessageQueue对象的topic是否存在
检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>
变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)
方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*"。
java
复制代码
this.subscriptionAutomatically(mq.getTopic());
构建标志位,逻辑或运算|=
java
复制代码
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
构建SubscriptionData对象(buildSubscriptionData)
请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回。
java
复制代码
SubscriptionData subscriptionData; try { subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); 、}
从broker中拉取消息(pullAPIWrapper.pullKernelImpl)
调用PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法从Broker拉取消息内容。
java
复制代码
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null );
对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中。
pullKernelImpl底层调用机制
调用PullAPIWrapper.processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData)方法对拉取消息的响应结果进行处理,主要是消息反序列化。
java
复制代码
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); // 设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回 requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
- 看到首先代码调用
MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker)
方法查找Broker地址,其中onlyThisBroker=false,表示若指定的brokerId未获取到地址则获取其他BrokerId的地址也行。 - 方法中根据brokerName和brokerId参数从MQClientInstance.brokerAddrTable->ConcurrentHashMap<, HashMap变量中获取对应的Broker地址,若获取不到则从brokerName下面的Map列表中找其他地址返回即可。
- 判断是否为空,若在上一步未获取到Broker地址,topic参数调用
MQClientInstance.updateTopicRouteInfoFromNameServer(String topic)
方法,然后在执行操作MQClientInstance.findBrokerAddressInSubscribe(String brokerName ,long brokerId,boolean onlyThisBroker)
,直到获取到Broker地址为止;
- 根据topic参数值从MQClientInstance.topicRouteTable: ConcurrentHashMapTopicRouteData>变量中获取TopicRouteData对象,
- Broker地址为参数从该TopicRouteData对象的filterServerTable:HashMap变量中获取该Broker下面的所有Filter服务器地址列表;
- 若该地址列表不为空,则随机选择一个Filter服务器地址返回;否则向调用层抛出异常,该pullKernelImpl方法结束;
获取对应的broker服务
若获取的Broker地址是备用Broker,则将标记位sysFlag的第1个字节置为0,即在消费完之后不提交 消费进度;
检查标记位sysFlag的第4个字节(即SubscriptionData. classFilterMode)是否为1;若等于1,则调用PullAPIWrapper.computPullFromWhichFilterServer(String topic, String brokerAddr)方法获取Filter服务器地址。大致逻辑如下:
java
复制代码
int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); }
初始化PullMessageRequestHeader对象
调用MQClientAPIImpl.pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)
方法向Broker地址或者Filter地址发送PULL_MESSAGE请求信息。
构建PullMessageRequestHeader对象,其中queueOffset变量值等于入参offset,设置broker的最长阻塞时间,默认是15秒,broker只有在没有消息的时候才会阻塞,如果阻塞超过设定时间会返回null,如果有消息会立即返回。
java
复制代码
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);
获取的Filter服务器发送PULL_MESSAGE请求信息,否则向Broker发送PULL_MESSAGE请求信息。
pullMessage方法处理
java
复制代码
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult;
代码实现机制(同步+异步)
在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。
java
复制代码
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:
- 发送失败后直接删掉responseTable变量中的相应记录;
- 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从responseTable中查找ResponseFuture对象,并设置该对象的responseCommand变量。若是同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;若是异步发送会调用ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;
pullMessageSync调用操作(对于同步拉取)
同步拉取方式,调用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)
方法。大致步骤如下:
- 调用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
java
复制代码
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
- 获取Broker地址的Channel信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量;若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中,便于下次使用;
- 若NettyRemotingClient.rpcHook:RPCHook变量不为空(该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值),则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
NettyRemotingAbstract.invokeSyncImpl(同步请求)
调用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:
- 使用请求的序列号(opaue)、超时时间初始化ResponseFuture对象;并将该ResponseFuture对象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap变量中;
- 调用Channel.writeAndFlush(Object msg)方法将请求对象RemotingCommand发送给Broker;然后调用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了ChannelFutureListener接口的operationComplete方法,在发送完成之后回调该监听类的operationComplete方法,在该方法中,首先调用ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置ResponseFuture对象的sendRequestOK等于true并退出此回调方法等待响应结果;若不成功则置ResponseFuture对象的sendRequestOK等于false,然后从NettyRemotingAbstract.responseTable中删除此请求序列号(opaue)的记录,置ResponseFuture对象的responseCommand等于null,并唤醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
- 调用ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息或者超时的情况下会唤醒该方法返回ResponseFuture.responseCommand变量值;
- 若上一步返回的responseCommand值为null,则抛出异常:若ResponseFuture.sendRequestOK为true,则抛出RemotingTimeoutException异常,否则抛出RemotingSendRequestException异常;
- 若上一步返回的responseCommand值不为null,则返回responseCommand变量值;
结果返回
- 若NettyRemotingClient.rpcHook: RPCHook变量不为空,则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
2、以上一步的返回值RemotingCommand对象为参数调用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成PullResultExt对象然后返回给调用者,响应消息的结果状态转换如下:
- 若RemotingCommand对象的Code等于SUCCESS,则PullResultExt.pullStatus=FOUND;
- 若RemotingCommand对象的Code等于PULL_NOT_FOUND,则PullResultExt.pullStatus= NO_NEW_MSG;
- 若RemotingCommand对象的Code等于PULL_RETRY_IMMEDIATELY,则PullResultExt.pullStatus= NO_MATCHED_MSG;
- 若RemotingCommand对象的Code等于PULL_OFFSET_MOVED,则PullResultExt.pullStatus= OFFSET_ILLEGAL;
定时请求响应结果列表
在NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔1秒定期扫描responseTable列表,遍历该列表中的ResponseFuture对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback()方法,并将该对象从responseTable列表中删除;