RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

本文涉及的产品
云服务器 ECS,每月免费额度280元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ为开发者提供了两种消息的消费模式,分别是PullPush,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer;
接下来我将带大家通过以下几个方面了解这两种模式:

  • Pull和Push的使用示例
  • 跟踪源码分析两种模式的实现原理
  • RocketMQ到底是Push还是Pull呢?

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Pull模式

这种模式很容易理解,就是消费者主动请求Broker去拉取一批消息,然后消费;

这种模式的好处是可以根据客户端消费能力主动获取消息量;但是弊端也比较明显,就是获取消息的时机不太好把握
,获取时间间隔小容易造成CPU浪费,时间间隔太大又会造成消费不及时。

1.1 使用示例

使用提供的DefaultMQPullConsumer这个实现,调用fetchMessageQueuesInBalance拿到该Topic下的Queue,然后调用pull()方法从Queue中指定offset获取消息

public class PullConsumer {
   
    public static void main(String[] args) throws MQClientException {
   
        // 创建Pull模式消费实例
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("test_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        // 获取该Topic下的所有Queue
        Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance("TopicTest");
        PullResult pullResult = null;
        // 从Queue中获取消息
        for (MessageQueue messageQueue : messageQueues) {
   
            long offset = this.consumeFromOffset(messageQueue);
            pullResult = consumer.pull(messageQueue, "*", offset, 32);
            switch (pullResult.getPullStatus()) {
   
                case FOUND:
                    List<MessageExt> msgs = pullResult.getMsgFoundList();
                    // 执行自定义的消费逻辑
                    this.doSomething(msgs);
                    //update offset to broker
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case OFFSET_ILLEGAL:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_NEW_MSG:
                    Thread.sleep(1);
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_MATCHED_MSG:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                default:
            }
        }
    }
}

1.2 拉取源码分析

  • 注意一下DefaultMQPullConsumer.pull()方法;
  • 这个方法会执行MQClientAPIImpl().pullMessage把请求封装为RequestCode.PULL_MESSAGE的RemotingCommand命令;
  • 然后由NettyRemotingClient发送RemotingCommand命令到Broker;
  • 最后就是Broker收到拉取请求根据请求信息把匹配的消息响应到客户端。

1.2.1 核心代码

/*
   封装请求报文RemotingCommand.createRequestCommand(RequestCode.**PULL_MESSAGE**)
 */
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
   
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
/*
   RemotingClient调用channel.writeAndFlush(request)发出拉取请求
 */
public abstract class NettyRemotingAbstract {
   
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
   
        ...
        try {
   
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
                ...
            });
            ...
        } 
        ...
    }
}

2 Push模式

这个模式解决了Pull模式请求时间间隔的痛点,从直观上看来就是Broker主动推送消息,这样消息消费也比较及时。

2.1 使用示例

用api提供的DefaultMQPushConsumer这个实现,首先订阅Topic及注册监听方法,然后调用start方法就可以接收消息了。

public class Consumer {
   

    public static void main(String[] args) throws MQClientException {
   
        // 创建Push模式消费实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        // 注册监听方法处理消息逻辑
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 开启
        consumer.start();
    }
}

2.2 推送源码分析

  • 在执行DefaultMQPushConsumer.start()方法后
  • 实际上会开启一个PullMessageService任务
  • 在PullMessageService这个任务中会轮询执行DefaultMQPushConsumerImpl.pullMessage
  • 跟踪pullMessage源码发现其实是在Pull模式拉取逻辑上增加一系列延迟请求,一定程度上避免短时间内无效请求
/*
   开启拉取消息任务
 */
public class MQClientInstance {
   

    public void start() throws MQClientException {
   
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
/*
   轮询执行拉取消息请求
 */
public class PullMessageService extends ServiceThread {
   
    @Override
    public void run() {
   
        while (!this.isStopped()) {
   
            try {
   
                MessageRequest messageRequest = this.messageRequestQueue.take();
                // 执行DefaultMQPushConsumerImpl.pullMessage拉取
                this.pullMessage((PullRequest) messageRequest);
            } catch (Exception e) {
   
                logger.error("Pull Message Service Run Method exception", e);
            }
        }
    }
}
/*
   同Pull模式QClientAPIImpl().pullMessage基础上进一步封装了Pull逻辑;
   在命中某些条件下执行executePullRequestLater方法延迟请求拉取,避免短时间内大量无效请求
 */
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   

    public void pullMessage(final PullRequest pullRequest) {
   

        // 通过判断各种条件下是否执行延迟处理,避免短时间内大量无效请求
        if (...) {
   
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }

        ...

        // 拉取回调逻辑,执行之前注册的registerMessageListener监听
        PullCallback pullCallback = new PullCallback() {
   
            @Override
            public void onSuccess(PullResult pullResult) {
   
                ...
            }
            @Override
            public void onException(Throwable e) {
   
                ...
            }
        };

        ...

        try {
   
            this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    ...
                    ...
                    pullCallback
            );
        } catch (Exception e) {
   
            // 延迟请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }
}
  • 在Broker端,收到Consumer拉取请求后如果没有新消息时会将请求挂起一定时间,也可以避免Consumer重复无效请求
  • 此外如果CommitLog有消息产生,Broker也会主动将消息返回给之前挂起的Consumer,已达到消息消费的及时性
/*
   没有消息则挂起此次请求
 */
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
   
    @Override
    public RemotingCommand handle(final GetMessageResult getMessageResult,
                                  final RemotingCommand request,
                                  ...
                                  final Channel channel,
                                  ...
                                  RemotingCommand response) {
   

        switch (response.getCode()) {
   
            switch (response.getCode()) {
   
                case ResponseCode.SUCCESS:
                    ...
                    // 有消息则写回Channel
                case ResponseCode.PULL_NOT_FOUND: // 没有消息
                    ...
                    // 挂起请求
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                ...
            }
        }
    }
}
/*
   CommitLog有新消息主动通知Consumer来拉取消息
 */
public class NotifyMessageArrivingListener implements MessageArrivingListener {
   
    @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
                         long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
   
        // 通知挂起的客户端来拉取消息
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
                msgStoreTime, filterBitMap, properties);
        ...
    }
}

最后

目前为止通过源码跟踪我们可以发现,RocketMq的Push模式的实现和我们通常了解的实现上有一定的差异,它是由由Consumer主要来发起拉取请求去Broker拉取,
但是Rocketmq通过对拉取逻辑的一系列封装,以及采用长轮询机制让Consumer请求挂起避免短轮询无效请求,同时Broker在消息产生时也会及时通知挂起的Consumer来拉取消息,最终达到了Push的效果。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2天前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
29 0
|
3月前
|
Java Maven
SpringBoot集成RabbitMQ-三种模式的实现
SpringBoot集成RabbitMQ-三种模式的实现
82 0
|
2月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
20 1
|
3月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
27 1
|
2月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
24 0
|
2月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
28 0
|
2月前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
2月前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
23 3
|
2月前
|
消息中间件 Java Spring
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费
48 0

相关产品

  • 弹性容器实例
  • 云服务器 ECS
  • 轻量应用服务器