RabbitMq的一些概念

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: RabbitMq的一些概念

早在一年多以前,我就已经开始试图在项目中异步化一些业务,例如系统的行为日志。当时选择的就是大名鼎鼎的RabbitMQ,这也是调查过不少同类产品后最终的选择,直到今天也无怨无悔~

最喜欢的一点并不是它的业务模型丰富,而是它支持的语言很全面,从php到java,c/c++,甚至nodejs,都可以很方便的使用(虽然c/c++下的库文档真的很少~)!

虽然我一直记着它的好,但悲剧的是早先调研它时学习的很多概念,时至今日已经忘不少了~所以感觉还是要写一篇博文记录下来,以备后用!

那就一个一个来吧:

Message acknowledgment

当队列中的任务被你的消费者进程取走后,如果消费者处理中挂掉了,那这个任务也就丢失了(虽然可能只做了一半)!很多情况下这并不是我们可以接受的,所以 Ack 机制出现了,它给了我们一个很优的解决方案: 当消费者连接断开后,如果RabbitMQ没有收到消费者针对该任务的Ack,那么RabbitMQ就会认为该消费者挂掉了,同时会把该任务分给其他消费者。

这里还要注意的是:任务是没有超时限制的,也就是说只要消费者的连接有效,RabbitMQ就不会把任务再发送给其他消费者,这样可以保证某些需要耗时很久的任务正常执行。

尤其注意的是,千万不要忘记发送Ack,否则RabbitMQ会不停的把任务重复发送并且一直积累,直到崩溃~可以通过下面这个命令来查看当前没有收到Ack的消息个数:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability

一般情况下当RabbitMQ退出或崩溃,那么队列和任务将会丢失,这当然是不能容忍的。RabbitMQ提供了持久化方案,只需要把队列声明成持久的即可(注意,RabbitMQ并不允许修改当前已存在队列的持久性)。

此外我们还需要把消息也设置成持久化的,这些都有对应的属性参数让我们来设置。

但注意,RabbitMQ并不能百分之百保证消息一定不会丢失,因为为了提升性能,RabbitMQ会把消息暂存在内存缓存中,直到达到阀值才会批量持久化到磁盘,也就是说如果在持久化到磁盘之前RabbitMQ崩溃了,那么就会丢失一小部分数据,这对于大多数场景来说并不是不可接受的,如果确实需要保证任务绝对不丢失,那么应该使用事务机制。

Round-robin dispatching

这个机制是RabbitMQ最常用业务模型中的。一般我们选择异步任务,除了降低模块间的依赖外,还有一个理由就是有效规避大并发负载,尤其是针对http。

举个场景,网站上的找回密码功能,系统会向对应用户的邮箱发送修改密码的连接。如果是同步流程的话,大量用户同时请求该功能,由于发送邮件比较耗时,那么你的web服务器会持续等待,这个时候就可能会被大量涌入的请求搞死,即便是没死,也会大大影响其响应速度。

那么如果使用异步的话,我们可以把找回密码的请求都存到队列里,然后由后台进程逐步完成邮件发送的任务,web服务器就可以快速响应用户。

好,说了这么多,那么到底 Round-robin 是做什么的?

上图中我们有两个消费者(C1,C2),它们同时从队列中领取任务并执行,默认情况下RabbitMQ会按照顺序依次把消息发送给C1和C2,这样可以保证每个消费者领到的任务个数都是相同的,这种分配任务的方式就是Round-robin。

任务耗时不均匀的情况下,这种方式可能并不是最佳的。

Fair dispatch

上面说到了,由于默认情况下RabbitMQ不会去管任务到底是什么类型的(特指其耗时情况),它只会一味的按照 Round-robin 的算法把队列中的消息平均分配给所有消费者。还是上面的那个图,我们假设队列中的任务很奇葩,奇数任务是耗时久的,偶数任务是耗时低的,那么C1可能一直很忙,而C2则几乎没事儿可做!

听上去很不公平是吧?这就是因为 Round-robin 机制并不考虑每个消费者当前正在处理的任务数(换句话说,就是当前该消费者仍没有Ack的任务数)。

我们可以设置 prefetch 来避免上述情况,该设置可以告诉RabbitMQ:直到该消费者处理完当前指定数目的任务之前,不要再给消费者分配新任务(这是依赖统计该消费者的Ack情况来实现的,可见两者必须欧同时开启哦)。

如果当前所有消费者都在忙,那么任务将会阻塞在队列中,你可能需要增加消费者数量来避免大量任务被阻塞。

Exchanges

上图中的那种架构并不是RabbitMQ推荐的,为什么这么说呢?RabbitMQ核心思想是生产者绝对不应该直接将任务投递到目标队列中,换句话说,生产者根本不需要知道任务最终应该会投递到哪里。

取而代之的,生产者只需要把任务发送到一个 Exchange 中即可

Exchange 非常容易理解,它负责根据映射关系和投递模型把任务投递到队列中,有效的投递模型有:direct,topic,headers,fanout。官方提供的例子中就已经把这些模型讲的很清楚了。

剩下要做的就是把Exchange和Queue绑定到一起了,如果向一个没有绑定任何队列的Exchange发送任务,则任务都会被丢弃。

你可以通过下面的命令来查看绑定关系:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

Routing

其实这个机制是建立在 Exchanges 上的,有了Route,我们就可以实现根据类别,让Exchange来选择性的分发任务给匹配的队列。

要做到这点,只需要在为Exchange绑定queue时设置一个 routingKey 即可。注意,fanout类型的exchanges会忽略这个值,毕竟这种类型的exchange要实现的是广播机制。

如上图,我们这次使用的是 direct 投递模型的Exchage,这种模型下的路由逻辑非常简单:根据绑定时声明的routingKey来分发任务。

另外值得一提的是,绑定非常灵活,不仅可以像上图那样为一个队列绑定多个不同的routingKey,也可以为Exchage绑定多个队列同时监听相同的routingKey(这等同于fanout模型)。

Topic exchange

我承认,可能排版上有点乱,因为按道理说这个概念应该合并到 Exchanges 中,但是由于它依赖 Routing ,所以我决定采用官方提供的学习步骤。

我们已经了解过direct,fanout两种投递模型。那么topic到底又是什么呢?

简单来说,topic只是为routingKey设置了一个规则(任意单词来描述主题,以"."分割为不同层级,长度不能超过255位),有点命名空间的味道,这里称之为主题可能更加合适一些。

在规则中还提供了两个关键字:

  1. *:可以匹配任意1个单词;
  2. #:可以匹配任意0个或多个单词。

有点正则的味道,不过确实在direct模型的基础上进一步提升了灵活性。举个例子,如果我们用这么一个routingKey:*.love.*,那么投递任务时,任何这种模式主题的任务都会投递到对应队列,例如:everyone.love.kazaff。再如:kazaff.#,这意味着会匹配kazaff.me.is.cool,也会匹配kazaff.me,等等。

如果发送任务用的routingKey不能匹配声明的模式,那么任务就会被丢弃。

你可以把模式只定义为#来模仿fanout模型,也可以把模式定义为不包含*和\#的具体字符串来模仿direct模型。

Message properties

在AMQP协议中为每个任务预定义了14个属性,大多数属性都非常少用,常用的4个如下:

  1. deliveryMode:标识任务的持久性;
  2. contentType:用于描述任务数据的MIME-TYPE,例如application/json;
  3. replyTo:用于命名一个callback队列;
  4. correlationId:用于标识RPC任务的请求与响应的配对编号。

可想到为每一次RPC请求都创建一个回调队列是非常低效的。更高效的做法是为每一个发出RPC请求的客户端创建一个回调队列,但这又产生了一个新问题:如何知道回调队列中的响应是对应哪一个请求的呢?

这就是 correlationId 属性存在的意义,我们只要为每个请求设置一个唯一的值,我们就能在从回调队列中取到响应数据后根据correlationId找到对应的请求。

如果我们收到一个未知的correlationId响应,只需要忽略它既可。你可能会想,怎么可能收到未知的响应?确实,这种情况发生的概率不高

假设S从rpc_queue中取得RPC任务后,进行处理,然后把响应发送给reply_to队列,此时S挂了,它还没来得及向rpc_queue发送ack!但是你知道的,其实整个RPC已经可以算完成了!

这个时候S重启完毕,它会再次取到刚才的那个RPC任务,再次处理,再次把结果发送给reply_to队列,这次它挺了下来没死机,并把ack发送到rpc_queue。但是,C端早已经在S第一次死机之前就拿到结果了,第二次发来的响应任务C自然找不到对应的correlationId。我这么说,你懂了么?


当目前为止,我已经把我认为重要的概念都提到了,有啥问题,私下讨论吧!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 中间件 数据安全/隐私保护
RabbitMQ 的核心概念
RabbitMQ 的核心概念
42 2
|
3月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
4月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
122 2
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
131 1
|
4月前
|
消息中间件 存储 RocketMQ
【RocketMQ系列十】RocketMQ的核心概念说明
【RocketMQ系列十】RocketMQ的核心概念说明
69 1
|
5月前
|
消息中间件 存储 中间件
【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念
2.6Broker 代理服务器(Broker)是消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 2.7Pull Consumer 拉取式消费(Pull Consumer)是 Consumer 消费的一种类型,也是默认的类型。下游应用系统通常主动调用 Consumer 的拉消息方法从 Broke r服务器拉消息,即主动权由下游应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
5月前
|
消息中间件 存储 物联网
RocketMQ基础概念
RocketMQ基础概念
59 1
|
6月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
6月前
|
消息中间件 Java API
RabbitMQ(基础概念, 简单使用)(下)
RabbitMQ(基础概念, 简单使用)
53 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ(基础概念, 简单使用)(中)
RabbitMQ(基础概念, 简单使用)
37 0