DefaultMQPushConsumer
使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数,使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后,系统会自动保存Offset,并且如果加入了新的DefaultMQPushConsumer,系统会自动做负载均衡。
RocketMQ的消息模式
RocketMQ提供Clustering和Broadcasting两种消息模式。
- Clustering模式下,ConsumerGroup内每个Consumer只消费所订阅消息的一部分,而所有Consumer消费内容合在一起构成Topic内容,实现负载均衡。
- Broadcasting模式下,同一ConsumerGroup内每个Consumer都接收所订阅Topic的全部消息,每个消息分发给多个Consumer消费。
推模式的的案例代码
使用 DefaultMQPushConsumer
可以自动控制读取操作,收到消息后会自动调用传入的处理方法进行处理,并且自动保存 Offset。主要需要设置好各种参数以及传入处理消息的函数。当加入新的 DefaultMQPushConsumer
后,系统会自动进行负载均衡。
java
复制代码
public class DefaultMQPushConsumerSample { public static void main(String[] args) throws MQClientException { // Consumer 的 GroupName 用于把多个 Consumer 组织到一起,提高并发处理能力,GroupName 需要和消息模式( MessageModel) 配合使用。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如“ip1:port;ip2:port;ip3:port”。 consumer.setNamesrvAddr("127.0.0.1:9876"); /* Specify where to start in case the specified Consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); /* Subscribe one more more Topics to consume. */ // Topic 名称用来标识消息类型,需要提前创建。 // 如果不需要消费某个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤, // 比如:Consumer.subscribe("TopicTest","tag1||tag2||tag3"),表示这个 Consumer 要消费“TopicTest”下带有tag1或tag2或tag3的消息 // ( Tag 是在发送消息时设置的标签)。在填写 Tag 参数的位置, 用 null 或者“*” 表示要消费这个 Topic 的所有消息。 consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of Messages fetched from brokers. */ consumer.registerMessageListener( (MessageListenerConcurrently) (msgs, context) -> { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); System.out.println(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); /* * Launch the Consumer instance.*/ consumer.start(); } }
DefaultMQPushConsumer 的处理流程
DefaultMQPushConsumer 的主要功能是由 DefaultMQPushConsumerImpl 类实现的。消息的处理逻辑在 pullMessage 函数中的 PullCallBack 中完成。PullCallBack 函数里有一个 switch 语句,根据从 Broker 返回的消息类型进行相应的处理。
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
java
复制代码
PullCallback pullCallback = new PullCallback() { public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch(pullResult.getPullStatus()) { case FOUND: // 省略代码 break; case NO_NEW_MSG: // 省略代码 break; case NO_MATCHED_MSG: // 省略代码 break; case OFFSET_ILLEGAL: // 省略代码 } } } public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) { DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L); } };
长轮询
DefaultMQPushConsumer
中通过使用“PullRequest
”以“长轮询”(long polling)方式实现了 Push 效果。长轮询方式既保留了 Pull 的好处,又具有 Push 方式的实时性。在长轮询中,客户端的请求权力仍然掌握在 Consumer
手中,即使 Broker 有大量消息积压,也不会主动推送给 Consumer
。长轮询方式的局限性是需要占用资源来维护客户端的请求,因此适合在消息队列等客户端连接数可控的场景中使用。
Push 方式是 Server 端接收到消息后主动将消息推送给 Client 端,这种方式实时性强。然而对于提供队列服务的 Server 来说,用 Push 方式主动推送会增加 Server 端的工作量,从而影响 Server 的性能;而且 Client 的处理能力不同,Client 的状态也不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,就会出现潜在问题。
Pull 方式是 Client 端循环地从 Server 端拉取消息,主动权在 Client 手中,自己拉取到一定数量的消息后,再进行处理。Pull 方式的问题在于循环拉取消息的间隔不好设定,间隔太短会导致一种“忙等”状态,浪费资源;而每个 Pull 的时间间隔太长会导致 Server 端有更多的消息到来而没有被及时处理。
长轮询方式通过 Client 端和 Server 端的合作,既保留了 Pull 的优点,又在保证实时性方面达到了目的。
com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
java
复制代码
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl(). pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis)
的作用是设置 Broker 的最长阻塞时间,其默认设置是 15 秒,但需注意仅当 Broker 没有新消息时才会被阻塞,如果有新消息则会立即返回。
“长轮询” 服务端代码
从 Broker 的源码中可以看出,服务端在接收到新的消息请求后,并不会急于返回,而是通过一个循环状态不断地查看队列中是否有新消息。每次查看状态时,会暂停一段时间(默认为 5 秒),然后再次进行检查。在默认情况下,当 Broker 没有新的消息时,第三次检查时,若等待时间超过 Request 中设定的 Broker-SuspendMaxTimeMillis,会返回一个空结果。
java
复制代码
if (this.brokerController.getBrokerConfig().isLongPollingEnable()){ this.waitForRunning( 5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { Log. info("[ NOTIFYME] check hold request cost {} ms.", costTime); }
在等待的过程中,一旦 Broker 收到新的消息,就会立即调用 notifyMessageArriving 函数并返回请求结果。"长轮询"的核心是,Broker 会暂时地保留客户端请求,在这段时间内如果有新的消息到达,则可以不用创建新的连接,而是利用现有的连接立刻返回消息给 Consumer。
messageQueue和processQueue
PullRequest中定义了messageQueue和processQueue。
processQueue
processQueue是一个快照类,在PushConsumer运行时,每个MessageQueue都会有一个对应的ProcessQueue对象,用于保存该MessageQueue消息处理状态的快照。
ProcessQueue对象主要包含一个TreeMap和一个读写锁。TreeMap以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到但还未被处理的消息;读写锁控制着多个线程对TreeMap对象的并发访问。
在pull逻辑中,PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,如果有任何一个值超过了设置的大小,则会隔一段时间再拉取消息,以达到流量控制的目的。此外,ProcessQueue还可以辅助实现顺序消费的逻辑。相应代码如下:
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
PushConsumer会判断获取但还未处理的消息个数
java
复制代码
long size = processQueue.getMsgCount().get(); if (size > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, 50L); if (this.flowControlTimes1++ % 1000L == 0L) { this.log.warn("the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, this.flowControlTimes1}); } }
消息总大小、Offset的跨度
java
复制代码
if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, 50L); if (this.flowControlTimes2++ % 1000L == 0L) { this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.flowControlTimes2}); } return; } }
ProcessQueue对象主要包含一个TreeMap和一个读写锁
java
复制代码
else { if (!processQueue.isLocked()) { this.executePullRequestLater(pullRequest, 3000L); this.log.info("pull message later because not locked in broker, {}", pullRequest); return; } if (!pullRequest.isLockedFirst()) { long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy}); if (brokerBusy) { this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } }
内容总结
DefaultMQPushConsumer使用长轮训技术,让consumer不断向broker端请求消息,如果没有可消费的消息,则阻塞一段时间,等待broker推送消息给consumer。具体实现过程为,先每隔一段时间从broker获取消息进行消费,如果没有需要消费的消息,则调用poll函数向远程broker获取最新的消息,最长等待时间为Consumer的maxTimeConsumeConitnusly属性,如果超时时间到达还没有新的消息,则返回null。这种方式实现实时更新消息且对于broker的开销较小,但会导致consumer不断发起请求,增加网络负载和调用次数。因此需要合理设置长轮询的超时时间。