前提介绍
在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。
DefaultMQPullConsumer
DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。
总体流程执行
DefaultMQPullConsumer使用例子
public class MQPullConsumer { private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.start(); // 从指定topic中拉取所有消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 获取消息的offset,指定从store中获取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); } // 保存上次消费的消息下标 private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 获取上次消费的消息的下标 private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; } } 复制代码
- 消费者启动:consumer.start();
- 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest"); //遍历队列 for(MessageQueue mq:mqs){ try { //获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ //第二个参数表示可以消费哪些tag的消息 //第三个参数表示从哪个位移开始消费消息 //第四个参数表示一次最大拉多少个消息 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); } 复制代码
DefaultMQPullConsumer的总体流程
启动DefaultMQPullConsumer是通过调用start()方法完成的
DefaultMQPullConsumer拉取源码分析
分析下DefaultMQPullConsumer拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic") 复制代码
从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); 复制代码
核心源码分析
fetchSubscribeMessageQueues()
- 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
- 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
- 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { try { TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); if (topicRouteData != null) { // 2、遍历topicRouteData Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); if (!mqList.isEmpty()) { return mqList; } else { throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); } } } catch (Exception e) { throw new MQClientException( "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); } 复制代码
遍历过程TopicRouteData
遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set<MessageQueue> mqList = new HashSet<MessageQueue>(); List<QueueData> qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; } 复制代码
consumer.fetchConsumeOffset
通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore) 复制代码
fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。
pullBlockIfNotFound拉取信息
rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。
不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } 复制代码
获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。
- 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
- 参数2:需要过滤用的表达式 ;
- 参数3:偏移量即消费队列的进度 ;
- 参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block) 复制代码
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.isRunning(); //检查入参是否合法 if (null == mq) { throw new MQClientException("mq is null", null); } if (offset < 0) { throw new MQClientException("offset < 0", null); } if (maxNums <= 0) { throw new MQClientException("maxNums <= 0", null); } //更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果 this.subscriptionAutomatically(mq.getTopic()); int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); //调用PullAPIWrapper从broker拉取消息, //pullKernelImpl方法里面构建PullMessageRequest请求对象 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq,//队列 subscriptionData.getSubString(),//消息的过滤规则 subscriptionData.getExpressionType(), isTagType ? 0L : subscriptionData.getSubVersion(), offset,//拉取消息的位移 maxNums,//建议broker一次性返回最大消息个数,默认是32个 sysFlag, 0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移 this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis,//超时时间 CommunicationMode.SYNC, null//回调逻辑为null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //If namespace is not null , reset Topic without namespace. this.resetTopic(pullResult.getMsgFoundList()); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; } 复制代码
检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*" this.subscriptionAutomatically(mq.getTopic()); // 构建标志位,逻辑或运算|= int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData; try { //以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回 subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; // 从broker中拉取消息 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null ); // 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; } 复制代码
Push和Pull的操作对比
- push-优点:及时性、服务端统一处理实现方便
- push-缺点:容易造成堆积、负载性能不可控
- pull-优点:获得消息状态方便、负载均衡性能可控
- pull-缺点:及时性差
使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。