消息拉取介绍|学习笔记

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 快速学习消息拉取介绍

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息拉取介绍】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12495


消息拉取介绍

 

消息拉取介绍

消息消费拉取的过程,消息消费有两种模式,一种是广播模式,一种是集群模式。广播模式比较简单不存在什么负载均衡,就是每一个消费者需要拉取订阅主题下面的所有队列的消息,那么重点讲解的是集群模式下的一个负载均衡的一个过程。那集群模式之下,同一个消费者组内有多个消费者,然后同一个主题又有多个消息队列,那么消费者是通过负载均衡的方式去消费这个消息,它的做法是一个消息队列只能同一时间被一个消费者去消费,而一个消费者可以去消费多个消息队列,这是它的负载均衡的一个基本思想。

以 PullMessageSerivce来看消息消费实现的机制,为什么要去研究 PullMessageSerivce ?就是刚才所讲的消费者在启动的时候,会看到 mQclientFactory cline 的客户端,它在启动的时候有PullMessageSerivce 拉取消息的服务的启动,服务的启动是做了什么事情?启动之后会调用它的 run 方法,通过PullMessageSerivce 拉取消息的方式去把消息拉下来,那怎么去拉?

while (!this.isStopped())[

try (

PullRequestpul Request = this.pullRequestQueue.take();

this.pullMessage(pullRequest);

] catch(InterruptedException ignored)

]catch (Exception e) (

log.error("PullMessage Service Run Method exception", e);

首先是从拉取队列当中去请求对象,拉取的 request 请求对象之后处理拉取的请求,那么怎么去处理?

private void pullMessage(final PullRequest pullRequest){

FinalMQConsumerInner consumer = this.moclientFactory.selectConsumer(pullRequest.getConsumerGroup());

if (consumer !m null) {

DefaultMQPushConsumerImpl impl =(DefaultMQPushConsumerImp1) consumer;

impl.pullMessage(pullRequest);

)else (

log.warn("No matchedconsumer for the PullRequest (), drop it",pullRequest);

发现它首先从请求对象当中去找到当前的 consumer ,把消费者先去找到,找到之后把消费者发现变成了推送的消费者,然后再去调用 pullMessage 的方法。

image.png

整个过程总结,PullMessageSerivce  随着 MQClientlnstance  的启动而启动起来的,启动起来之后它就会去源源不断的去取出队列当中拉取数据的请求。然后开始处理请求,在处理请求的时候先去拿到消费者,通过消费者的pullMessage  把消息请求进行处理,去拿到消息并且推给消费方。

private String consumerGroup; //消费者组

private MessageQueue messageQueue; //待拉取消息队列

private ProcessQueue processQueue; //消息处理队列

private long nextoffset; //待拉取的Messagebueue偏移量

private boolean lockedFirst = false; //是否被锁定

Pull Request其中包括当前的消费者组、待拉取消息队列、消息处理队列、待拉取的 MessageQueue偏移量以及是否被锁定。

重点研究process queen 从名字上能够看到这是处理的队列,那它和消息队列到底有什么关系?其实processqueen 就是 MessageQueue 在消费端的一个重现或者是一个快照,PullMessageSerivce 从消息服务器去拉取32条消息,默认每次拉取32条消息,按照消息队列的偏移量按顺序存放到process queen当中,然后PullMessageSerivce  就会将消息提交到消费池当中,让它去进行对应的处理,它就可以再一次去拉取新的消息了,它这么做的好处就是能实现数据的转换,并且清空之后再可以从远端拉取新的消息。

所以这里有几个东西需要额外注意,其中PullMessageSerivce 是一个拉取消息的服务。通过这个是从服务端去拉取这个消息,拉取消息拉下来之后把这个消息放到process queen当中,然后让消息消费方去从这里去取出消息进行一个处理。这是拉取消息了之后PullMessageSerivce 和process queen这两个重要的一个作用。

相关实践学习
通过ACR快速部署网站应用
本次实验任务是在云上基于ECS部署Docker环境,制作网站镜像并上传至ACR镜像仓库,通过容器镜像运行网站应用,网站运行在Docker容器中、网站业务数据存储在Mariadb数据库中、网站文件数据存储在服务器ECS云盘中,通过公网地址进行访问。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1月前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
19 1
|
2月前
|
消息中间件 存储 Apache
MQ产品使用合集之RocketMQ如果配置所有的ip,有些namesrv挂了的话,消息就发送失败了,消费也是失败的如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
97 1
|
2月前
|
消息中间件 Java
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
RabbitMQ中的消息发布-订阅模式是什么?如何实现?
61 0
|
2月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
48 0
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
439 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
983 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
684 0
|
消息中间件 负载均衡 RocketMQ
消息拉取介绍|学习笔记
快速学习消息拉取介绍
62 0
消息拉取介绍|学习笔记
|
消息中间件 RocketMQ 开发者
拉取消息长轮询机制|学习笔记
快速学习拉取消息长轮询机制
258 0
拉取消息长轮询机制|学习笔记

热门文章

最新文章

  • 1
    流量控制系统,用正则表达式提取汉字
    25
  • 2
    Redis09-----List类型,有序,元素可以重复,插入和删除快,查询速度一般,一般保存一些有顺序的数据,如朋友圈点赞列表,评论列表等,LPUSH user 1 2 3可以一个一个推
    26
  • 3
    Redis08命令-Hash类型,也叫散列,其中value是一个无序字典,类似于java的HashMap结构,Hash结构可以将对象中的每个字段独立存储,可以针对每字段做CRUD
    25
  • 4
    Redis07命令-String类型字符串,不管是哪种格式,底层都是字节数组形式存储的,最大空间不超过512m,SET添加,MSET批量添加,INCRBY age 2可以,MSET,INCRSETEX
    27
  • 5
    S外部函数可以访问函数内部的变量的闭包-闭包最简单的用不了,闭包是内层函数+外层函数的变量,简称为函数套函数,外部函数可以访问函数内部的变量,存在函数套函数
    23
  • 6
    Redis06-Redis常用的命令,模糊的搜索查询往往会对服务器产生很大的压力,MSET k1 v1 k2 v2 k3 v3 添加,DEL是删除的意思,EXISTS age 可以用来查询是否有存在1
    30
  • 7
    Redis05数据结构介绍,数据结构介绍,官方网站中看到
    21
  • 8
    JS字符串数据类型转换,字符串如何转成变量,+号只要有一个是字符串,就会把另外一个转成字符串,- * / 都会把数据转成数字类型,数字型控制台是蓝色,字符型控制台是黑色,
    19
  • 9
    JS数组操作---删除,arr.pop()方法从数组中删除最后一个元素,并返回该元素的值,arr.shift() 删除第一个值,arr.splice()方法,删除指定元素,arr.splice,从第一
    19
  • 10
    定义好变量,${age}模版字符串,对象可以放null,检验数据类型console.log(typeof str)
    19