RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

本文涉及的产品
无影云电脑企业版,4核8GB 120小时 1个月
无影云电脑个人版,1个月黄金款+200核时
资源编排,不限时长
简介: RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ源码(二)消息消费的模式到底是Push还是Pull?

RocketMQ为开发者提供了两种消息的消费模式,分别是PullPush,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer;
接下来我将带大家通过以下几个方面了解这两种模式:

  • Pull和Push的使用示例
  • 跟踪源码分析两种模式的实现原理
  • RocketMQ到底是Push还是Pull呢?

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Pull模式

这种模式很容易理解,就是消费者主动请求Broker去拉取一批消息,然后消费;

这种模式的好处是可以根据客户端消费能力主动获取消息量;但是弊端也比较明显,就是获取消息的时机不太好把握
,获取时间间隔小容易造成CPU浪费,时间间隔太大又会造成消费不及时。

1.1 使用示例

使用提供的DefaultMQPullConsumer这个实现,调用fetchMessageQueuesInBalance拿到该Topic下的Queue,然后调用pull()方法从Queue中指定offset获取消息

public class PullConsumer {
   
    public static void main(String[] args) throws MQClientException {
   
        // 创建Pull模式消费实例
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("test_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        // 获取该Topic下的所有Queue
        Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance("TopicTest");
        PullResult pullResult = null;
        // 从Queue中获取消息
        for (MessageQueue messageQueue : messageQueues) {
   
            long offset = this.consumeFromOffset(messageQueue);
            pullResult = consumer.pull(messageQueue, "*", offset, 32);
            switch (pullResult.getPullStatus()) {
   
                case FOUND:
                    List<MessageExt> msgs = pullResult.getMsgFoundList();
                    // 执行自定义的消费逻辑
                    this.doSomething(msgs);
                    //update offset to broker
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case OFFSET_ILLEGAL:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_NEW_MSG:
                    Thread.sleep(1);
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                case NO_MATCHED_MSG:
                    consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                    break;
                default:
            }
        }
    }
}

1.2 拉取源码分析

  • 注意一下DefaultMQPullConsumer.pull()方法;
  • 这个方法会执行MQClientAPIImpl().pullMessage把请求封装为RequestCode.PULL_MESSAGE的RemotingCommand命令;
  • 然后由NettyRemotingClient发送RemotingCommand命令到Broker;
  • 最后就是Broker收到拉取请求根据请求信息把匹配的消息响应到客户端。

1.2.1 核心代码

/*
   封装请求报文RemotingCommand.createRequestCommand(RequestCode.**PULL_MESSAGE**)
 */
public class MQClientAPIImpl implements NameServerUpdateCallback {
   
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
   
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
/*
   RemotingClient调用channel.writeAndFlush(request)发出拉取请求
 */
public abstract class NettyRemotingAbstract {
   
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
   
        ...
        try {
   
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
   
                ...
            });
            ...
        } 
        ...
    }
}

2 Push模式

这个模式解决了Pull模式请求时间间隔的痛点,从直观上看来就是Broker主动推送消息,这样消息消费也比较及时。

2.1 使用示例

用api提供的DefaultMQPushConsumer这个实现,首先订阅Topic及注册监听方法,然后调用start方法就可以接收消息了。

public class Consumer {
   

    public static void main(String[] args) throws MQClientException {
   
        // 创建Push模式消费实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        // 注册监听方法处理消息逻辑
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 开启
        consumer.start();
    }
}

2.2 推送源码分析

  • 在执行DefaultMQPushConsumer.start()方法后
  • 实际上会开启一个PullMessageService任务
  • 在PullMessageService这个任务中会轮询执行DefaultMQPushConsumerImpl.pullMessage
  • 跟踪pullMessage源码发现其实是在Pull模式拉取逻辑上增加一系列延迟请求,一定程度上避免短时间内无效请求
/*
   开启拉取消息任务
 */
public class MQClientInstance {
   

    public void start() throws MQClientException {
   
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
/*
   轮询执行拉取消息请求
 */
public class PullMessageService extends ServiceThread {
   
    @Override
    public void run() {
   
        while (!this.isStopped()) {
   
            try {
   
                MessageRequest messageRequest = this.messageRequestQueue.take();
                // 执行DefaultMQPushConsumerImpl.pullMessage拉取
                this.pullMessage((PullRequest) messageRequest);
            } catch (Exception e) {
   
                logger.error("Pull Message Service Run Method exception", e);
            }
        }
    }
}
/*
   同Pull模式QClientAPIImpl().pullMessage基础上进一步封装了Pull逻辑;
   在命中某些条件下执行executePullRequestLater方法延迟请求拉取,避免短时间内大量无效请求
 */
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   

    public void pullMessage(final PullRequest pullRequest) {
   

        // 通过判断各种条件下是否执行延迟处理,避免短时间内大量无效请求
        if (...) {
   
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }

        ...

        // 拉取回调逻辑,执行之前注册的registerMessageListener监听
        PullCallback pullCallback = new PullCallback() {
   
            @Override
            public void onSuccess(PullResult pullResult) {
   
                ...
            }
            @Override
            public void onException(Throwable e) {
   
                ...
            }
        };

        ...

        try {
   
            this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    ...
                    ...
                    pullCallback
            );
        } catch (Exception e) {
   
            // 延迟请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }
}
  • 在Broker端,收到Consumer拉取请求后如果没有新消息时会将请求挂起一定时间,也可以避免Consumer重复无效请求
  • 此外如果CommitLog有消息产生,Broker也会主动将消息返回给之前挂起的Consumer,已达到消息消费的及时性
/*
   没有消息则挂起此次请求
 */
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
   
    @Override
    public RemotingCommand handle(final GetMessageResult getMessageResult,
                                  final RemotingCommand request,
                                  ...
                                  final Channel channel,
                                  ...
                                  RemotingCommand response) {
   

        switch (response.getCode()) {
   
            switch (response.getCode()) {
   
                case ResponseCode.SUCCESS:
                    ...
                    // 有消息则写回Channel
                case ResponseCode.PULL_NOT_FOUND: // 没有消息
                    ...
                    // 挂起请求
                    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                    return null;
                ...
            }
        }
    }
}
/*
   CommitLog有新消息主动通知Consumer来拉取消息
 */
public class NotifyMessageArrivingListener implements MessageArrivingListener {
   
    @Override
    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
                         long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
   
        // 通知挂起的客户端来拉取消息
        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
                msgStoreTime, filterBitMap, properties);
        ...
    }
}

最后

目前为止通过源码跟踪我们可以发现,RocketMq的Push模式的实现和我们通常了解的实现上有一定的差异,它是由由Consumer主要来发起拉取请求去Broker拉取,
但是Rocketmq通过对拉取逻辑的一系列封装,以及采用长轮询机制让Consumer请求挂起避免短轮询无效请求,同时Broker在消息产生时也会及时通知挂起的Consumer来拉取消息,最终达到了Push的效果。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
6天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
38 3
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
75 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
40 2
|
7月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
53 0
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
54 0
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
86 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
87 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式