消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。我们接下来主要介绍Pull模式
Pull模式的处理机制
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。
Pull模式的使用特点
- 开发者自己维护OffsetStore。
- 自己保存消费组的offset,比如存入Redis,或调用MQ接口将其保存在Broker端。自主选择Message Queue和offset进行消息拉取。
- 用户拉去消息时,需要用户自己来决定拉去哪个队列从哪个offset开始,拉去多少消息。
相比Push的运行特点
与PUSH模式相比,PULL模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了Pull方式的编程复杂度,RocketMQ提供了调度消费服务(MQPullConsumerScheduleService),在topic的订阅发送变化(初次订阅或距上次拉取消息超时)就触发PULL方式拉取消息。
DefaultMQPullConsumer
针对于DefaultMQPullConsumer源码流程进行相关的分析,对于Push模式而言,Pull 模式比较适应于客户端拉去的速度由自己进行控制处理。而且实现的原理和复杂程度也简单了很多,我们从实现出发,进行分析对应的实现流程。
DefaultMQPullConsumer的Pull拉取模式的开发案例
指定队列模式消费对应队列的消息
java
复制代码
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); try { MessageQueue mq = new MessageQueue(); mq.setQueueId(0); mq.setTopic("lob"); mq.setBrokerName("brokerName"); long offset = 26; PullResult pullResult = consumer.pull(mq, "*", offset, 32); if (pullResult.getPullStatus().equals(PullStatus.FOUND)) { System.out.printf("%s%n", pullResult.getMsgFoundList()); consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); } } catch (Exception e) { e.printStackTrace(); } consumer.shutdown(); }
消费所有队列数据
从所有队列进行选择队列模式,并且存储offset在被本地。
java
复制代码
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 获取消息的offset,指定从store中获取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); // 保存上次消费的消息下标 private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 获取上次消费的消息的下标 private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; }
fetchSubscribeMessageQueues(从指定topic中拉取所有消息队列)
根据Topic获取该Topic的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息,
调用DefaultMQPullConsumer.fetchSubscribeMessageQueues(String topic)方法,根据topic获取对应的MessageQueue(即可被订阅的队列),在该方法中最终通过调用MQAdminImpl.fetchSubscribeMessageQueues(String topic)方法从NameServer获取该topic的MessageQueue。
java
复制代码
/** * @param topic Topic名称 * @return 该Topic所有的消息队列 */ @Override public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic)); }
fetchSubscribeMessageQueues底层调用
调用MQClientAPIImpl.getTopicRouteInfoFromNameServer(String topic, long timeoutMillis)方法,其中timeoutMillis=3000,该方法向NameServer发送GET_ROUTEINTO_BY_TOPIC请求码获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象;.
JAVA
复制代码
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { try { TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); if (topicRouteData != null) { // 2、遍历topicRouteData Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); if (!mqList.isEmpty()) { return mqList; } else { throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); } } } catch (Exception e) { throw new MQClientException( "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
fetchSubscribeMessageQueues底层调用
遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限,若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合;最后返回给MessageQueue集合
JAVA
复制代码
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set<MessageQueue> mqList = new HashSet<MessageQueue>(); List<QueueData> qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; }
消息的三种拉取模式
同步拉取消息
java
复制代码
/** * @param mq 消息队列 * @param subExpression 消息tag过滤表达式 * @param offset 消费组offset(从哪里开始拉去) * @param maxNums 一次最大拉去消息数量 * @param timeout 超时时间 * @return 存储了拉取状态以及消息 */ @Override public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout); }
异步拉取消息
java
复制代码
/** * @param mq 消息队列 * @param subExpression 消息tag过滤表达式 * @param offset 消费组offset(从哪里开始拉去) * @param maxNums 一次最大拉去消息数量 * @param timeout 超时时间 * @param pullCallback 异步回调函数 * @param timeout * @throws MQClientException * @throws RemotingException * @throws InterruptedException */ @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout); }
同步阻塞拉取消息
拉取消息,若没有找到消息,则阻塞一段时间。通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容。
- maxNums=32即表示获取的最大消息个数。
- offset为该MessageQueue对象的开始消费位置,可以调用
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
方法获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
方法
java
复制代码
/** * @param mq 消息队列 * @param subExpression tag过滤 * @param offset 消费组offset * @param maxNums 一次最大拉取数量 * @return */ @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums); }
pullBlockIfNotFound 和 pull区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND 。
维护消息队列的Offset
获取队列的消费Offset
java
复制代码
/** * @param mq 队列 * @param fromStore 是否从存储获取,true: 从当前服务器存储中获取,false:从远程broker获取 * @return 消费offset */ @Override public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore); }
调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取MessageQueue队列的消费进度,其中fromStore为false表示从存储端(即Broker端)获取消费进度;若fromStore为true表示从本地内存获取消费进度;
- 对于从存储端获取消费进度(即fromStore=true)的情况:
- 对于LocalFileOffsetStore对象,从本地加载offsets.json文件,然后获取该MessageQueue对象的offset值;
(即fromStore=false)对于RemoteBrokerOffsetStore对象,获取逻辑如下:
- 以MessageQueue对象的brokername从MQClientInstance. brokerAddrTable中获取Broker的地址;若没有获取到则立即调用updateTopicRouteInfoFromNameServer方法然后再次获取;
- 构造QueryConsumerOffsetRequestHeader对象,其中包括topic、consumerGroup、queueId;然后调用MQClientAPIImpl.queryConsumerOffset (String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis)方法向Broker发送QUERY_CONSUMER_OFFSET请求码,获取消费进度Offset;
- 用上一步从Broker获取的offset更新本地内存的消费进度列表数据RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>变量值;
更新消费组Offset
更新消费组的Offset,注意:只会在本地内存中更新,并不会同步到远程Broker.
java
复制代码
/** * @param mq 消息队列 * @param offset 消费进度 */ @Override public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset); }