一、什么是消息队列?
提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型。
我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:
- 解耦合.
- 削峰填谷.
(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。
(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。
其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列。
二、需求整理
1、生产者消费者模型核心概念
- 生产者 (Producer): 负责将消息发送到消息队列中。
- 消费者 (Consumer): 从消息队列中接收和处理消息。。
- 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。
- 发布 (Publish): 生产者将消息投递到中间人的过程。
- 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。
根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)
一个生产者,一个消费者:
N 个生产者,N 个消费者:
2、Broker 设计概要
我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储 和 转发,其中涉及到的核心概念如下:
- 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。
- 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。
- 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
(一个 Queue 中的消息可以来自于多个 Exchange)。
绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。
- 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。
上述概念在 Broker 中的体现如图所示:
补充说明1:数据存储
以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:
- 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
- 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。
补充说明:2: 交换机类型与转发规则
上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:
前要说明:
- 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
- 以下 routingKey(路由键)是生产者发送消息时指定的关键字。
(1)Direct
(直接交换机)
- 生产者发送消息时,会指定一个"目标队列"的名字
- 交换机收到消息后,查看当前交换机对应的绑定里面是否存在“队列名字”为routingKey的队列
- 如果有,就转发过去(把消息塞进对应的“目标队列”中)
- 如果没有,消息直接丢弃
(2)Fanout
(扇出交换机)
- 生产者无需指定routingKey,直接发送消息到指定交换机
- 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)
(3)Topic
(主题交换机)
- 生产者发送消息时,指定一个 routingKey
- 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
- 如果有,就将消息转发到对应的绑定队列中。
- 如果没有,则将消息丢弃。
PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
3、Broker 核心 API
Broker 基于以上概念和功能,需要实现的核心 API 如下:
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
补充说明:
- 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
- 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
- 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。
4、客户端设计概要
生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。
客户端核心API:
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
补充说明:
- 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
- 这里的一个Connection对象代表一个TCP连接。
- Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
- 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
- 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。
5、小结
最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:
- 实现生产者、BrokerServer、消费者三个部分
- 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
- 重点实现BrokerServer,包括内部的基本概念和核心 API。
- 数据的持久化存储。
三、具体实现
附上连接:
1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址