手写消息队列(基于RabbitMQ)

简介: 手写消息队列(基于RabbitMQ)

一、什么是消息队列?

提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型

我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:

  1. 解耦合.
  2. 削峰填谷.

(1)解耦合

在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷

在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列

二、需求整理

1、生产者消费者模型核心概念

  1. 生产者 (Producer): 负责将消息发送到消息队列中。
  2. 消费者 (Consumer): 从消息队列中接收和处理消息。。
  3. 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。
  4. 发布 (Publish): 生产者将消息投递到中间人的过程。
  5. 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。

根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)

一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要

我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储转发,其中涉及到的核心概念如下:

  1. 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。
  2. 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。
  3. 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定

(一个 Queue 中的消息可以来自于多个 Exchange)。

绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。

  1. 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储

以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:

  • 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
  • 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。

补充说明:2: 交换机类型与转发规则

上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:

前要说明:

  • 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
  • 以下 routingKey(路由键)是生产者发送消息时指定的关键字。

(1)Direct(直接交换机)

  1. 生产者发送消息时,会指定一个"目标队列"的名字
  2. 交换机收到消息后,查看当前交换机对应的绑定里面是否存在“队列名字”为routingKey的队列
  3. 如果有,就转发过去(把消息塞进对应的“目标队列”中)
  1. 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)

  1. 生产者无需指定routingKey,直接发送消息到指定交换机
  2. 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)

  1. 生产者发送消息时,指定一个 routingKey
  2. 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
  3. 如果有,就将消息转发到对应的绑定队列中。
  4. 如果没有,则将消息丢弃。


PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3、Broker 核心 API

Broker 基于以上概念和功能,需要实现的核心 API 如下:

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

补充说明:

  1. 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
  2. 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
  3. 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。

4、客户端设计概要

生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。


客户端核心API:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

补充说明:

  1. 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
  2. 这里的一个Connection对象代表一个TCP连接。
  3. Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
  4. 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
  5. 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。

5、小结

最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:

  1. 实现生产者、BrokerServer、消费者三个部分
  2. 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
  3. 重点实现BrokerServer,包括内部的基本概念和核心 API。
  4. 数据的持久化存储。

三、具体实现

附上连接:

1、消息队列详细设计与实现思维导图

2、Gitee 完整代码地址


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
9天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
11天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
38 2
|
19天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
56 4
|
17天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
23天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
61 4
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
48 1
|
1月前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
38 0
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。