RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ为开发者提供了两种消息的消费模式,分别是Pull和Push,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer;
接下来我将带大家通过以下几个方面了解这两种模式:
- Pull和Push的使用示例
- 跟踪源码分析两种模式的实现原理
- RocketMQ到底是Push还是Pull呢?
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
1 Pull模式
这种模式很容易理解,就是消费者主动请求Broker去拉取一批消息,然后消费;
这种模式的好处是可以根据客户端消费能力来主动获取消息量;但是弊端也比较明显,就是获取消息的时机不太好把握
,获取时间间隔小容易造成CPU浪费,时间间隔太大又会造成消费不及时。
1.1 使用示例
使用提供的DefaultMQPullConsumer这个实现,调用fetchMessageQueuesInBalance拿到该Topic下的Queue,然后调用pull()方法从Queue中指定offset获取消息
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
// 创建Pull模式消费实例
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("test_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 获取该Topic下的所有Queue
Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance("TopicTest");
PullResult pullResult = null;
// 从Queue中获取消息
for (MessageQueue messageQueue : messageQueues) {
long offset = this.consumeFromOffset(messageQueue);
pullResult = consumer.pull(messageQueue, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
// 执行自定义的消费逻辑
this.doSomething(msgs);
//update offset to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case OFFSET_ILLEGAL:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_NEW_MSG:
Thread.sleep(1);
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_MATCHED_MSG:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
default:
}
}
}
}
1.2 拉取源码分析
- 注意一下DefaultMQPullConsumer.pull()方法;
- 这个方法会执行MQClientAPIImpl().pullMessage把请求封装为RequestCode.PULL_MESSAGE的RemotingCommand命令;
- 然后由NettyRemotingClient发送RemotingCommand命令到Broker;
- 最后就是Broker收到拉取请求根据请求信息把匹配的消息响应到客户端。
1.2.1 核心代码
/*
封装请求报文RemotingCommand.createRequestCommand(RequestCode.**PULL_MESSAGE**)
*/
public class MQClientAPIImpl implements NameServerUpdateCallback {
public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
...
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
...
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
}
/*
RemotingClient调用channel.writeAndFlush(request)发出拉取请求
*/
public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
...
try {
...
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
...
});
...
}
...
}
}
2 Push模式
这个模式解决了Pull模式请求时间间隔的痛点,从直观上看来就是Broker主动推送消息,这样消息消费也比较及时。
2.1 使用示例
用api提供的DefaultMQPushConsumer这个实现,首先订阅Topic及注册监听方法,然后调用start方法就可以接收消息了。
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建Push模式消费实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.subscribe(TOPIC, "*");
// 注册监听方法处理消息逻辑
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启
consumer.start();
}
}
2.2 推送源码分析
- 在执行DefaultMQPushConsumer.start()方法后
- 实际上会开启一个PullMessageService任务
- 在PullMessageService这个任务中会轮询执行DefaultMQPushConsumerImpl.pullMessage
- 跟踪pullMessage源码发现其实是在Pull模式拉取逻辑上增加一系列延迟请求,一定程度上避免短时间内无效请求
/*
开启拉取消息任务
*/
public class MQClientInstance {
public void start() throws MQClientException {
...
// Start pull service
this.pullMessageService.start();
...
}
}
/*
轮询执行拉取消息请求
*/
public class PullMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
try {
MessageRequest messageRequest = this.messageRequestQueue.take();
// 执行DefaultMQPushConsumerImpl.pullMessage拉取
this.pullMessage((PullRequest) messageRequest);
} catch (Exception e) {
logger.error("Pull Message Service Run Method exception", e);
}
}
}
}
/*
同Pull模式QClientAPIImpl().pullMessage基础上进一步封装了Pull逻辑;
在命中某些条件下执行executePullRequestLater方法延迟请求拉取,避免短时间内大量无效请求
*/
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
// 通过判断各种条件下是否执行延迟处理,避免短时间内大量无效请求
if (...) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
...
// 拉取回调逻辑,执行之前注册的registerMessageListener监听
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
...
}
@Override
public void onException(Throwable e) {
...
}
};
...
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
...
...
pullCallback
);
} catch (Exception e) {
// 延迟请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
- 在Broker端,收到Consumer拉取请求后如果没有新消息时会将请求挂起一定时间,也可以避免Consumer重复无效请求
- 此外如果CommitLog有消息产生,Broker也会主动将消息返回给之前挂起的Consumer,已达到消息消费的及时性
/*
没有消息则挂起此次请求
*/
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
@Override
public RemotingCommand handle(final GetMessageResult getMessageResult,
final RemotingCommand request,
...
final Channel channel,
...
RemotingCommand response) {
switch (response.getCode()) {
switch (response.getCode()) {
case ResponseCode.SUCCESS:
...
// 有消息则写回Channel
case ResponseCode.PULL_NOT_FOUND: // 没有消息
...
// 挂起请求
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
return null;
...
}
}
}
}
/*
CommitLog有新消息主动通知Consumer来拉取消息
*/
public class NotifyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// 通知挂起的客户端来拉取消息
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
...
}
}
最后
目前为止通过源码跟踪我们可以发现,RocketMq的Push模式的实现和我们通常了解的实现上有一定的差异,它是由由Consumer主要来发起拉取请求去Broker拉取,
但是Rocketmq通过对拉取逻辑的一系列封装,以及采用长轮询机制让Consumer请求挂起避免短轮询无效请求,同时Broker在消息产生时也会及时通知挂起的Consumer来拉取消息,最终达到了Push的效果。
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq