一篇文章把RabbitMQ、RocketMQ、Kafka三元归一(一)

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 一篇文章把RabbitMQ、RocketMQ、Kafka三元归一(一)

RabbitMQ

RabbitMQ各组件的功能

  • Broker :一个RabbitMQ实例就是一个Broker
  • Virtual Host :虚拟主机。相当于MySQL的DataBase ,一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是/。
  • Exchange :交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue :消息队列,用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。
  • Banding :绑定关系,用于 消息队列和交换机之间的关联 。通过路由键( Routing Key )将交换机和消息队列关联起来。
  • Channel :管道,一条双向数据流通道。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了管道的概念,以复用一条TCP连接。
  • Connection :生产者/消费者 与broker之间的TCP连接。
  • Publisher :消息的生产者。
  • Consumer :消息的消费者。
  • Message :消息,它是由消息头和消息体组成。消息头则包括 Routing-KeyPriority (优先级)等

微信图片_20220907141157.jpg

RabbitMQ的多种交换机类型

Exchange 分发消息给 Queue 时, Exchange 的类型对应不同的分发策略,有3种类型的 Exchange :DirectFanoutTopic

  • Direct :消息中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致, Exchange 就会将消息分发到对应的队列中。
  • Fanout :每个发到 Fanout 类型交换机的消息都会分发到所有绑定的队列上去。Fanout交换机没有 Routing Key 。它在三种类型的交换机中转发消息是最快的
  • Topic :Topic交换机通过模式匹配分配消息,将 Routing Key 和某个模式进行匹配。它只能识别两个 通配符#*# 匹配0个或多个单词, * 匹配1个单词。

TTL

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共2种。

  • 在消息发送时进行指定 。通过配置消息体的 Properties ,可以指定当前消息的过期时间。
  • 在创建Exchange时指定 。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

生产者的消息确认机制

Confirm机制:

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是 消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

微信图片_20220907141228.jpg

  1. 在channel上开启确认模式channel.confirmSelect()
  2. 在channel上开启监听 :addConfirmListener ,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。

Return消息机制:

Return Listener 用于处理一些不可路由的消息

我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener

基础API中有个关键的配置项 Mandatory :如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

同样,通过监听的方式, chennel.addReturnListener(ReturnListener rl) 传入已经重写过handleReturn方法的ReturnListener。

消费端ACK与NACK

消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。但是对于服务器宕机等严重问题,我们需要 手动ACK 保障消费端消费成功。

// deliveryTag:消息在mq中的唯一标识  
// multiple:是否批量(和qos设置类似的参数)  
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。  
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如上代码,消息在 消费端重回队列 是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列( 避免进入死循环 ),也就是设置为false。

死信队列DLX

死信队列(DLX Dead-Letter-Exchange):当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能。

项目地址:https://github.com/YunaiV/ruoyi-vue-pro

RocketMQ

阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品。

Rocket:火箭的意思。

微信图片_20220907141325.png

RocketMQ的核心概念

他有以下核心概念:Broker 、 Topic 、 Tag 、 MessageQueue 、 NameServer 、 Group 、 Offset 、 Producer 以及 Consumer 。

下面来详细介绍。

  • Broker :消息中转角色,负责 存储消息 ,转发消息。Broker 是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将 Topic 信息注册到NameServer,顺带一提底层的通信和连接都是 基于Netty实现 的。Broker 负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。官网上有数据显示:具有 上亿级消息堆积能力 ,同时可 严格保证消息的有序性
  • Topic :主题!它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。
  • Tag :标签!可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同Topic而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
  • MessageQueue :一个Topic下可以设置多个消息队列,发送消息时执行该消息的Topic,RocketMQ会轮询该Topic下的所有队列将消息发出去。消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
  • NameServer :类似Kafka中的ZooKeeper,但NameServer集群之间是 没有通信 的,相对ZK来说更加 轻量 。它主要负责对于源数据的管理,包括了对于 Topic 和路由信息的管理。每个Broker在启动的时候会到NameServer注册,Producer在发送消息前会根据Topic去NameServer 获取对应Broker的路由信息 ,Consumer也会定时获取 Topic 的路由信息。
  • Producer :生产者,支持三种方式发送消息:同步、异步和单向 单向发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,且 没有回调函数 。异步发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应, 有回调函数 。同步发送 :消息发出去后,等待服务器响应成功或失败,才能继续后面的操作。
  • Consumer :消费者,支持 PUSH 和 PULL 两种消费模式,支持 集群消费广播消费 集群消费 :该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。广播消费 :会发给消费者组中的每一个消费者进行消费。相当于 RabbitMQ 的发布订阅模式。
  • Group :分组,一个组可以订阅多个Topic。分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
  • Offset :在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset为Java Long类型,64位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为Message Queue是一个长度无限的数组, Offset 就是下标。

延时消息

开源版的RocketMQ不支持任意时间精度,仅支持特定的level,例如定时5s,10s,1min等。其中,level=0级表示不延时,level=1表示1级延时,level=2表示2级延时,以此类推。

延时等级如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序 或者 全局有序 。

事务消息

微信图片_20220907141358.png

消息队列MQ提供类似X/Open XA的分布式事务功能,通过消息队列MQ事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。

事务消息发送及提交:
  1. 发送half消息
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。

事务消息的补偿流程:

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新Commit或RollBack

其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。

事务消息状态:

事务消息共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。

RocketMQ的高可用机制

RocketMQ是天生支持分布式的,可以配置主从以及水平扩展。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

消息消费的高可用(主从):

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

RocketMQ目前还不支持把Slave自动转成Master ,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

消息发送高可用(配置多个主节点):

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。

主从复制:

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

  • 同步复制 :同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。如果Master出故障, Slave上有全部的备份数据,容易恢复同步复制会增大数据写入延迟,降低系统吞吐量。
  • 异步复制 :异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失

通常情况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

负载均衡

Producer负载均衡:

Producer端,每个实例在发消息的时候,默认会 轮询 所有的Message Queue发送,以达到让消息平均落在不同的Queue上。而由于Queue可以散落在不同的Broker,所以消息就发送到不同的Broker下,如下图:

微信图片_20220907141431.png

Consumer负载均衡:

如果Consumer实例的数量比Message Queue的总数量还多的话, 多出来的Consumer实例将无法分到Queue ,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让Queue的总数量大于等于Consumer的数量。

  • 消费者的集群模式:启动多个消费者就可以保证消费者的负载均衡(均摊队列)
  • 默认使用的是均摊队列 :会按照Queue的数量和实例的数量平均分配Queue给每个实例,这样每个消费者可以均摊消费的队列,如下图所示6个队列和三个生产者。
  • 另外一种平均的算法 环状轮流分Queue 的形式,每个消费者,均摊不同主节点的一个消息队列,如下图所示:

对于广播模式并不是负载均衡的,要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。死信队列的名称是 %DLQ%+ConsumGroup 。

死信队列具有以下特性:

  1. 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
  2. 如果一个Group ID未产生死信消息,消息队列RocketMQ不会为其创建相应的死信队列。
  3. 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。

基于微服务的思想,构建在 B2C 电商场景下的项目实战。核心技术栈,是 Spring Boot + Dubbo 。未来,会重构成 Spring Cloud Alibaba 。

项目地址:https://github.com/YunaiV/onemall

相关文章
|
5月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
80 3
|
2月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
2月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
120 1
|
3月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
119 1
|
3月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
42 1
|
4月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
240 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
114 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
64 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
364 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
88 3