消息拉取介绍|学习笔记

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: 快速学习消息拉取介绍

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息拉取介绍】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12495


消息拉取介绍

 

消息拉取介绍

消息消费拉取的过程,消息消费有两种模式,一种是广播模式,一种是集群模式。广播模式比较简单不存在什么负载均衡,就是每一个消费者需要拉取订阅主题下面的所有队列的消息,那么重点讲解的是集群模式下的一个负载均衡的一个过程。那集群模式之下,同一个消费者组内有多个消费者,然后同一个主题又有多个消息队列,那么消费者是通过负载均衡的方式去消费这个消息,它的做法是一个消息队列只能同一时间被一个消费者去消费,而一个消费者可以去消费多个消息队列,这是它的负载均衡的一个基本思想。

以 PullMessageSerivce来看消息消费实现的机制,为什么要去研究 PullMessageSerivce ?就是刚才所讲的消费者在启动的时候,会看到 mQclientFactory cline 的客户端,它在启动的时候有PullMessageSerivce 拉取消息的服务的启动,服务的启动是做了什么事情?启动之后会调用它的 run 方法,通过PullMessageSerivce 拉取消息的方式去把消息拉下来,那怎么去拉?

while (!this.isStopped())[

try (

PullRequestpul Request = this.pullRequestQueue.take();

this.pullMessage(pullRequest);

] catch(InterruptedException ignored)

]catch (Exception e) (

log.error("PullMessage Service Run Method exception", e);

首先是从拉取队列当中去请求对象,拉取的 request 请求对象之后处理拉取的请求,那么怎么去处理?

private void pullMessage(final PullRequest pullRequest){

FinalMQConsumerInner consumer = this.moclientFactory.selectConsumer(pullRequest.getConsumerGroup());

if (consumer !m null) {

DefaultMQPushConsumerImpl impl =(DefaultMQPushConsumerImp1) consumer;

impl.pullMessage(pullRequest);

)else (

log.warn("No matchedconsumer for the PullRequest (), drop it",pullRequest);

发现它首先从请求对象当中去找到当前的 consumer ,把消费者先去找到,找到之后把消费者发现变成了推送的消费者,然后再去调用 pullMessage 的方法。

image.png

整个过程总结,PullMessageSerivce  随着 MQClientlnstance  的启动而启动起来的,启动起来之后它就会去源源不断的去取出队列当中拉取数据的请求。然后开始处理请求,在处理请求的时候先去拿到消费者,通过消费者的pullMessage  把消息请求进行处理,去拿到消息并且推给消费方。

private String consumerGroup; //消费者组

private MessageQueue messageQueue; //待拉取消息队列

private ProcessQueue processQueue; //消息处理队列

private long nextoffset; //待拉取的Messagebueue偏移量

private boolean lockedFirst = false; //是否被锁定

Pull Request其中包括当前的消费者组、待拉取消息队列、消息处理队列、待拉取的 MessageQueue偏移量以及是否被锁定。

重点研究process queen 从名字上能够看到这是处理的队列,那它和消息队列到底有什么关系?其实processqueen 就是 MessageQueue 在消费端的一个重现或者是一个快照,PullMessageSerivce 从消息服务器去拉取32条消息,默认每次拉取32条消息,按照消息队列的偏移量按顺序存放到process queen当中,然后PullMessageSerivce  就会将消息提交到消费池当中,让它去进行对应的处理,它就可以再一次去拉取新的消息了,它这么做的好处就是能实现数据的转换,并且清空之后再可以从远端拉取新的消息。

所以这里有几个东西需要额外注意,其中PullMessageSerivce 是一个拉取消息的服务。通过这个是从服务端去拉取这个消息,拉取消息拉下来之后把这个消息放到process queen当中,然后让消息消费方去从这里去取出消息进行一个处理。这是拉取消息了之后PullMessageSerivce 和process queen这两个重要的一个作用。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
相关文章
|
1月前
|
人工智能 搜索推荐 程序员
从程序员到UP主:一个结构化的B站视频文案生成方案
专为技术人打造的B站视频文案生成指令,结构化拆解内容创作流程。输入主题、类型等信息,结合AI一键生成含分镜脚本、标题、SEO及互动设计的完整方案,提升创作效率与视频质量。
319 2
|
安全 Java
【深入理解同步器AQS】
【深入理解同步器AQS】
432 0
|
XML Java 数据格式
如果Spring中有两个ID相同的Bean,会报错吗?
有位粉丝被 问到这样一个问题,说在Spring中,如果有两个ID相同的Bean,会不会报错?如果报错,会在哪个阶段报错? 这个问题也要分析具体的情况,才能完整的回答。我从三个方面来回答你的问题吧。
617 0
|
1月前
|
缓存 NoSQL 关系型数据库
MySQL 与 Redis 如何保证双写一致性?
我是小假 期待与你的下一次相遇 ~
347 7
|
2月前
|
消息中间件 canal 缓存
缓存与数据库一致性终极指南:从入门到放弃?不,到精通!上
凌晨被投诉惊醒?缓存与数据库不一致是常见难题。本文详解五大解决方案:旁路缓存、双删策略、消息队列补偿、Binlog监听与版本号控制,结合场景分析一致性、性能与复杂度的权衡,助你选型不踩坑。
|
8月前
|
JavaScript Java 开发者
Spring事务失效,常见的情况有哪些?
本文总结了Spring事务失效的7种常见情况,包括未启用事务管理功能、方法非public类型、数据源未配置事务管理器、自身调用问题、异常类型错误、异常被吞以及业务和事务代码不在同一线程中。同时提供了两种快速定位事务相关Bug的方法:通过查看日志(设置为debug模式)或调试代码(在TransactionInterceptor的invoke方法中设置断点)。文章帮助开发者更好地理解和解决Spring事务中的问题。
342 7
|
消息中间件 存储 监控
消息队列通信的优缺点
【10月更文挑战第29天】消息队列通信具有诸多优点,如解耦性强、异步通信、缓冲削峰等,能够有效地提高系统的灵活性、可扩展性和稳定性。但同时也存在一些缺点,如系统复杂性增加、性能开销、数据一致性挑战和实时性受限等。在实际应用中,需要根据具体的业务需求和场景,权衡其优缺点,合理地选择和使用消息队列通信机制,以实现系统的高效运行和优化。
|
测试技术 API 数据库
gRPC Status 状态码枚举类型 介绍文档 (更新 gRPC Status 状态码 实操 代码技巧介绍)
gRPC Status 状态码枚举类型 介绍文档 (更新 gRPC Status 状态码 实操 代码技巧介绍)
361 5
|
11月前
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
3964 1
|
存储 机器学习/深度学习 数据采集
HyDE
HyDE
549 5