rocketMq - 并发消费过程

简介: rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。

rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。

并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。

其他逻辑

    1、consumer会定期向broker同步ack消息偏移量,也就是已经消费的位置。

    2、极端情况下consumer会因为一个消息一直失败导致ack消息偏移量无法前进,但是因为会有定时任务去清楚过期消息,所以ack进度正常便宜。

并发消费整体流程


img_e42b201eec0d4110aa1d143b9cc41d57.png
并发消费过程

说明:

    1、Rebalance负责生成pullRequest放置到pullRequestQueue当中。

    2、PullMessageService负责消费pullRequest来完成数据的拉取。

    3、数据拉取后生成ConsumeRequest对象投递到consumeExecutor的线程池当中

    4、ConsumeRequest是一个线程实例,负责消费拉取的消息。

    5、消费消息成功就从ConsumeRequest的ProcessQueue中删除,消费失败就投递到broker的重试队列中,重试次数和延迟粒度在broker端处理。

    6、consumeRequest内部维持的processQueue作为一个TreeMap对象可以维持消息的有序性,用于判断消费进度。

    7、pullRequest在消费完以后还是再次投递到pullRequestQueue当中。


pullRequest执行过程

img_560ef18c51c66cc839a19629abc8037e.png
consumer消费入口

说明:参见PullMessageService类

    1、单线程循环消费pullRequest。


img_4a31898b068227407da9775a52aa329b.png
消费流速控制

说明:参见PullMessageService类

    1、消费过程中进行一些状态判断以及流速控制


img_545f763656fced7cb5bb1dc98d61376d.png
有序消费和无须消费处理逻辑

说明:参见DefaultMQPushConsumerImpl类

    1、区分有序消费和无须消费

    2、无序消费会判断消费偏移量是否差别过大


img_5468c88753eafbbd9cbed0b6d0964f2b.png
拉取消息的回调函数

说明:参见DefaultMQPushConsumerImpl类

    1、处理拉取消息的后续操作

    2、处理完以后再次投递pullRequest请求


img_f639317df1c327af445dc0e2dc270d47.png
消息拉取执行部分

说明:参见PullAPIWrapper类


img_4cb41a297b1753b9ec3e8a79484f3817.png
真正执行拉取的地方

说明:参见PullAPIWrapper类


img_693f104adadfb1fb65b10e22bb032b66.png
处理拉取的消息结果

说明:参见ConsumeMessageConcurrentlyService类。

    1、拉取消息成功后设置下一次拉取的偏移量。

    2、更新拉取的消息到processQueue当中。

    3、再次投递pullRequest发起下一次拉取。


img_0d89fbb795e34cb622fd3a019590cdaa.png
处理拉取消息的分配处理

说明:参见ConsumeMessageConcurrentlyService类

    1、分一次能够处理完成和分多次能够处理完成。


img_ddafe2724d7a12b24de4ea306cb4bbd5.png
consumer消费对象的核心

说明:

    1、processQueue是待处理消息保存位置,里面核心数据结构之一为TreeMap

    2、messageQueue就是这个ConsumeRequest负责处理的messageQueue


img_bb99831e3b2de4ae7d1c92116661c17d.png
回调函数消费并进行结果处理

说明:参见ConsumeMessageConcurrentlyService类

    1、consumer消费拉取消息的逻辑及后续处理


img_bcf2d78875ac7d09f017667b344a5dab.png
持久化消费位移

说明:参见ConsumeMessageConcurrentlyService类

    1、消费成功就删除所有拉取的消息


img_6e8b8775104891ca759270533bd5883c.png
broker端存储重试消息

说明:参见SendMessageProcessor类

    1、处理逻辑在consumerSendMsgBack方法中

    2、里面涉及到延迟粒度和重试次数的设置

    3、消息是被投递到延迟队列当中的


img_428bbcdfa7fb86576a7f01e4bde68770.png
定期持久化消费位移

说明:参见MQClientInstance类

    1、在persistAllConsumerOffset定期持久化消费偏移量

    2、消费偏移量由ConsumerRequest请求在处理的过程中变更的


img_64270fa0f4b301ff94a2ad621c2f35be.png
重新发送拉取请求

说明:参见DefaultMQPushConsumerImpl类

    1、处理没有从broker拉取消息的过程

    2、再次投递pullRequest请求

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
49 2
|
3月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
86 0
|
6月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
94 0
|
消息中间件 安全 Go
动态订阅时 rocketmq-client-go 代码有map并发bug
动态订阅时 rocketmq-client-go 代码有map并发bug
65 2
|
消息中间件 测试技术
RabbitMQ消费端并发和限流设置
RabbitMQ消费端并发和限流设置
1079 0
|
消息中间件 缓存 监控
Rocketmq并发和顺序消费的失败重试机制
Rocketmq并发和顺序消费的失败重试机制
|
存储 消息中间件 API
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
|
消息中间件 存储 Java
阿里二面:RocketMQ 消费失败了,怎么处理?
阿里二面:RocketMQ 消费失败了,怎么处理?
522 2
阿里二面:RocketMQ 消费失败了,怎么处理?
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2745 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 存储 Cloud Native
云原生中间件RocketMQ-消费者消费模式之广播模式
云原生中间件RocketMQ-消费者消费模式之广播模式
887 7
下一篇
无影云桌面