一篇文章把RabbitMQ、RocketMQ、Kafka三元归一(一)

简介: 一篇文章把RabbitMQ、RocketMQ、Kafka三元归一(一)

RabbitMQ

RabbitMQ各组件的功能

  • Broker :一个RabbitMQ实例就是一个Broker
  • Virtual Host :虚拟主机。相当于MySQL的DataBase ,一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是/。
  • Exchange :交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue :消息队列,用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。
  • Banding :绑定关系,用于 消息队列和交换机之间的关联 。通过路由键( Routing Key )将交换机和消息队列关联起来。
  • Channel :管道,一条双向数据流通道。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了管道的概念,以复用一条TCP连接。
  • Connection :生产者/消费者 与broker之间的TCP连接。
  • Publisher :消息的生产者。
  • Consumer :消息的消费者。
  • Message :消息,它是由消息头和消息体组成。消息头则包括 Routing-KeyPriority (优先级)等

微信图片_20220907141157.jpg

RabbitMQ的多种交换机类型

Exchange 分发消息给 Queue 时, Exchange 的类型对应不同的分发策略,有3种类型的 Exchange :DirectFanoutTopic

  • Direct :消息中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致, Exchange 就会将消息分发到对应的队列中。
  • Fanout :每个发到 Fanout 类型交换机的消息都会分发到所有绑定的队列上去。Fanout交换机没有 Routing Key 。它在三种类型的交换机中转发消息是最快的
  • Topic :Topic交换机通过模式匹配分配消息,将 Routing Key 和某个模式进行匹配。它只能识别两个 通配符#*# 匹配0个或多个单词, * 匹配1个单词。

TTL

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共2种。

  • 在消息发送时进行指定 。通过配置消息体的 Properties ,可以指定当前消息的过期时间。
  • 在创建Exchange时指定 。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

生产者的消息确认机制

Confirm机制:

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是 消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

微信图片_20220907141228.jpg

  1. 在channel上开启确认模式channel.confirmSelect()
  2. 在channel上开启监听 :addConfirmListener ,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。

Return消息机制:

Return Listener 用于处理一些不可路由的消息

我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener

基础API中有个关键的配置项 Mandatory :如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

同样,通过监听的方式, chennel.addReturnListener(ReturnListener rl) 传入已经重写过handleReturn方法的ReturnListener。

消费端ACK与NACK

消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。但是对于服务器宕机等严重问题,我们需要 手动ACK 保障消费端消费成功。

// deliveryTag:消息在mq中的唯一标识  
// multiple:是否批量(和qos设置类似的参数)  
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。  
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如上代码,消息在 消费端重回队列 是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列( 避免进入死循环 ),也就是设置为false。

死信队列DLX

死信队列(DLX Dead-Letter-Exchange):当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能。

项目地址:https://github.com/YunaiV/ruoyi-vue-pro

RocketMQ

阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品。

Rocket:火箭的意思。

微信图片_20220907141325.png

RocketMQ的核心概念

他有以下核心概念:Broker 、 Topic 、 Tag 、 MessageQueue 、 NameServer 、 Group 、 Offset 、 Producer 以及 Consumer 。

下面来详细介绍。

  • Broker :消息中转角色,负责 存储消息 ,转发消息。Broker 是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将 Topic 信息注册到NameServer,顺带一提底层的通信和连接都是 基于Netty实现 的。Broker 负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。官网上有数据显示:具有 上亿级消息堆积能力 ,同时可 严格保证消息的有序性
  • Topic :主题!它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。
  • Tag :标签!可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同Topic而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
  • MessageQueue :一个Topic下可以设置多个消息队列,发送消息时执行该消息的Topic,RocketMQ会轮询该Topic下的所有队列将消息发出去。消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
  • NameServer :类似Kafka中的ZooKeeper,但NameServer集群之间是 没有通信 的,相对ZK来说更加 轻量 。它主要负责对于源数据的管理,包括了对于 Topic 和路由信息的管理。每个Broker在启动的时候会到NameServer注册,Producer在发送消息前会根据Topic去NameServer 获取对应Broker的路由信息 ,Consumer也会定时获取 Topic 的路由信息。
  • Producer :生产者,支持三种方式发送消息:同步、异步和单向 单向发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,且 没有回调函数 。异步发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应, 有回调函数 。同步发送 :消息发出去后,等待服务器响应成功或失败,才能继续后面的操作。
  • Consumer :消费者,支持 PUSH 和 PULL 两种消费模式,支持 集群消费广播消费 集群消费 :该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。广播消费 :会发给消费者组中的每一个消费者进行消费。相当于 RabbitMQ 的发布订阅模式。
  • Group :分组,一个组可以订阅多个Topic。分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
  • Offset :在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset为Java Long类型,64位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为Message Queue是一个长度无限的数组, Offset 就是下标。

延时消息

开源版的RocketMQ不支持任意时间精度,仅支持特定的level,例如定时5s,10s,1min等。其中,level=0级表示不延时,level=1表示1级延时,level=2表示2级延时,以此类推。

延时等级如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序 或者 全局有序 。

事务消息

微信图片_20220907141358.png

消息队列MQ提供类似X/Open XA的分布式事务功能,通过消息队列MQ事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。

事务消息发送及提交:
  1. 发送half消息
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。

事务消息的补偿流程:

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新Commit或RollBack

其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。

事务消息状态:

事务消息共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。

RocketMQ的高可用机制

RocketMQ是天生支持分布式的,可以配置主从以及水平扩展。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

消息消费的高可用(主从):

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

RocketMQ目前还不支持把Slave自动转成Master ,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

消息发送高可用(配置多个主节点):

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。

主从复制:

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

  • 同步复制 :同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。如果Master出故障, Slave上有全部的备份数据,容易恢复同步复制会增大数据写入延迟,降低系统吞吐量。
  • 异步复制 :异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失

通常情况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

负载均衡

Producer负载均衡:

Producer端,每个实例在发消息的时候,默认会 轮询 所有的Message Queue发送,以达到让消息平均落在不同的Queue上。而由于Queue可以散落在不同的Broker,所以消息就发送到不同的Broker下,如下图:

微信图片_20220907141431.png

Consumer负载均衡:

如果Consumer实例的数量比Message Queue的总数量还多的话, 多出来的Consumer实例将无法分到Queue ,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让Queue的总数量大于等于Consumer的数量。

  • 消费者的集群模式:启动多个消费者就可以保证消费者的负载均衡(均摊队列)
  • 默认使用的是均摊队列 :会按照Queue的数量和实例的数量平均分配Queue给每个实例,这样每个消费者可以均摊消费的队列,如下图所示6个队列和三个生产者。
  • 另外一种平均的算法 环状轮流分Queue 的形式,每个消费者,均摊不同主节点的一个消息队列,如下图所示:

对于广播模式并不是负载均衡的,要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。死信队列的名称是 %DLQ%+ConsumGroup 。

死信队列具有以下特性:

  1. 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
  2. 如果一个Group ID未产生死信消息,消息队列RocketMQ不会为其创建相应的死信队列。
  3. 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。

基于微服务的思想,构建在 B2C 电商场景下的项目实战。核心技术栈,是 Spring Boot + Dubbo 。未来,会重构成 Spring Cloud Alibaba 。

项目地址:https://github.com/YunaiV/onemall

相关文章
|
2月前
|
消息中间件 存储 Kafka
RabbitMQ、RocketMQ和Kafka全面对决,谁是最佳选择?
1、应用场景 1.RabbitMQ: 适用于易用性和灵活性要求较高的场景 异步任务处理:RabbitMQ提供可靠的消息传递机制,适用于处理异步任务,例如将耗时的任务放入消息队列中,然后由消费者异步处理,提高系统的响应速度和可伸缩性。 解耦系统组件:通过使用RabbitMQ作为消息中间件,不同的系统组件可以通过消息进行解耦,实现松耦合的架构,提高系统的可维护性和灵活性。 事件驱动架构:RabbitMQ的发布-订阅模式可以用于构建事件驱动架构,将系统中的事件作为消息发布到相应的主题,不同的消费者可以订阅感兴趣的主题进行相应的处理。
188 2
|
2月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
2月前
|
消息中间件 Cloud Native 物联网
深度剖析 RocketMQ 5.0,Apache RocketMQ:如何从互联网时代演进到云时代?
从整体技术架构上学习 RocketMQ 5.0 的云原生架构、一体化架构,最后再分别从业务场景切入,详细介绍 RocketMQ 5.0 在不同的业务场景提供的能力和关键技术原理,包括业务消息、流处理、物联网以及面向云时代的事件驱动场景。
107568 1
|
4月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
182 0
|
6月前
|
消息中间件 存储 架构师
RabbitMQ vs Kafka:正面交锋(2)
RabbitMQ 是一个消息代理中间件,而 Apache Kafka 是一个分布式流处理平台。这种差异可能看起来只是语义上的,但它会带来严重的影响,影响我们方便地实现各种系统功能。 例如 Kafka 最适合处理流数据,在同一主题同一分区内保证消息顺序,而 RabbitMQ 对流中消息的顺序只提供基本的保证。
66 1
|
6月前
|
消息中间件 存储 Kafka
RabbitMQ vs Kafka:正面交锋(1)
值得注意的是,无论消费者是否消费了这些消息,Kafka 都会将消息保留在分区中直至预先配置的时间段内。这种保留意味着消费者可以自由地重读过去的消息。此外,开发人员还可以使用 Kafka 的存储层来实现事件溯源和审计日志等机制。
61 1
|
6月前
|
消息中间件 存储 传感器
何时使用Kafka而不是RabbitMQ
在公司项目中,一般消息量都不大的情况下,博主推荐大家可以使用 RabbitMQ。消息量起来了可以考虑切换到 Kafka,但是也要根据公司内部对两种 MQ 的熟悉程度来进行选择,避免 MQ 出现问题时无法及时处理。
84 0
|
7月前
|
消息中间件 存储 Kafka
Kafka 与 RabbitMQ:比较功能和用例
Kafka 与 RabbitMQ:比较功能和用例
400 1
Kafka 与 RabbitMQ:比较功能和用例
|
7月前
|
消息中间件 存储 算法
MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )
MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )
169 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
617 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!