消息代理(Message Broker)
消息代理是一种消息验证、传输、路由的架构模式。在应用程序之间起到通信调度并最小化应用之间的依赖作用,是的应用程序可以高效的解耦通信过程。消息代理是一种中间件产品,核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,应用场景如:
- 将消息路由到一个或多个目的地
- 将消息转化为其他的表现方式
- 执行消息的内聚,消息的分解,并将结果发送到他们的目的地,然后重新组合响应返回给消息用户
- 调用Web服务来检索数据
- 响应时间或错误
- 使用发布-订阅模式来提供内容或基于主题的消息路由
可选择的开源框架如下:
- ActiveMQ
- Kafka
- RabbitMQ
- RocketMQ
概述
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件,服务器使用高性能,可伸缩而闻名的Erlang语言编写而成,其集群和故障转移是构建在开放电信平台框架上的。
AMQP:Advanced Message Queuing Protocol,是一个面向消息中间件的开放式标准应用层协议,定义了消息方向、消息队列、消息路由(点到点和发布-订阅模式)、可靠性、安全性。
RabbitMQ以AMQP协议实现,所以他可以支持多种操作系统,多种编程语言,集合可以覆盖所有主流的企业级技术平台。在Spirng Cloud Bus中包含了对RabbitMQ的自动化默认配置。
spring amqp项目将核心spring概念应用于基于amqp的消息传递解决方案的开发。它提供了一个“模板”,作为发送和接收消息的高级抽象。它还通过“侦听器容器”支持消息驱动的pojo。这些库有助于管理amqp资源,同时促进依赖注入和声明性配置的使用。在所有这些情况下,您将看到与spring框架中jms支持的相似之处。
该项目由两部分组成:spring amqp是基本抽象,spring rabbit是rabbitmq实现。
特性
异步处理入站消息的监听器容器
发送和接收消息的RabbitTemplate
rabbitadmin用于自动声明队列、交换机和绑定
基本概念
- Borker:消息队列服务器的实体,是一个中间件应用,负责接收生产者的消息,然后将消息发送至消息接收者或者其他的Broker。
- Exchange:消息交换机,是消息第一个到达的地方,消息通过他指定的路由规则,分发到不同的消息队列中区
- Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态,每个消息都会被发送到一个或多个队列
- Binding:绑定,就是把Exchange和Queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接
- Routing Key:路由关键字,Exchange根据关键字进行消息投递
- Virtual Host:虚拟主机,对Broker的虚拟划分,将消费者、生产者和他们依赖的AMQP相关结构进行隔离,一般都是为了安全考虑,比如,我们可以在一个Broker中设置多个虚拟主机,对不同用户进行权限的隔离。
- Connection:连接,代表生产者、消费者、Broker之间进行通信的物理网络
- Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接中,可建立多个Channel,每个Channel代表一个会话任务,通过Channel可以隔离同一个连接中的不同交互内容。
- Producer:消息生产者,制造消息并发送消息的程序
- Consumer:消息消费者,接收消息并处理消息的程序
消息投递到队列过程
- 客户端连接到消息队列服务器,打开一个Channel
- 客户端声明一个Exchange,并设置相关属性
- 客户端声明一个Queue,并设置相关属性
- 客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系
- 客户投递消息到Exchange
- Exchange接收到消息后,根据消息的Key和已经设置的Binding,进行消息路由,将消息投递到一个或多个Queue里
Exchange类型
- Direct交换机:完全根据Key进行投递,比如,绑定时设置了Routing Key为abc,那么客户端提交的信息,只有设置了Key为abc的才会被投递到队列中
- Topic交换机:对Key进行模式匹配后进行投递,可以使用符号
#
匹配一个或多个词,符号*
匹配正好一个词。如abc.#匹配abc.def.ghi,而abc.*只匹配abc.def - Fanout交换机:不需要任何Key,采用广播的模式,一个消息进来时,投递到与该交换机绑定的多有队列。
RabbitMQ支持消息持久化到磁盘
- Exchange持久化,在声明时指定durable=>1
- Queue持久化,在声明时指定durable=>1
- 消息持久化,在投递时指定delivery_mode=>2(1是非持久化)
注意:Exchange和Queue都是持久化的,那么他们之间的Binding也是持久化的,如果Exchange和Queue两者之间有一个是持久化的,一个是非持久化的,就不允许建立绑定。
安装与使用
windows平台
- 由于其基于Erlang语言编写,所以先安装Erlang,下载地址
双击安装,安装路径不要与中文与空格
- 配置环境变量
- path中添加
- cmd中测试
- 安装RabbitMQ,下载地址
下载后双击安装 - RabbitMQ Server安装完成之后,会自动注册为服务,并以默认配置进行启动
可能的问题:启动失败,通常是由于用户名为中文,导致默认的db和log目录访问出现问题。解决方法,写在RabbitMQ,配置环境变量RABBITMQ_BASE为一个不含中文的路径,如E:\rabbitmq,然后重新安装
Rabbit管理
- 进入rabbitMQ安装目录下的sbin目录,执行如下命令开启WEB管理插件,
rabbitmq-plugins.bat enable rabbitmq_management
C:\software\RabbitMQ\rabbitmq_server-3.7.18\sbin>rabbitmq-plugins.bat enable rabbitmq_management Enabling plugins on node rabbit@DESKTOP-KJFR0VP: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@DESKTOP-KJFR0VP... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins.
- 打开浏览器并访问
http://localhost:15672
- 默认用户名,密码均是guest
- 角色标签
none:不能访问management plugin
management:用户通过AMQP做的任何事外加如下内容
- 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中的queues、exchanges和bindings
- 查看和关闭自己的channels和connections
- 查看有关自己的virtual hosts的全局统计信息,包含其他用户在这些virtual hosts中的活动
policymarker:management可以做的任何事外加如下内容
- 查看、删除、创建自己的virtual hosts所属的policies和parameters
monitoring:management可以做的任何事外加如下内容
- 列出所有virtual hosts,包括他们不能登陆的virtual hosts
- 查看其他用户的connections和channels
- 查看节点级别的数据,如clustering和memory的使用情况
- 查看所有virtual hosts的全局统计信息
administrator:policymarker和monitoring可以做的任何事外加如下内容
- 创建virtual hosts
- 查看、创建和删除users
- 查看、创建和删除permissions
- 关闭其他用户的connections
快速入门
- 创建Spring Boot应用rabbitmq-demo
- pom.xml添加依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
- 在application.properties文件中添加如下内容
spring.application.name=rabbitmq-demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest srping.rabbitmq.password=guest
用户名与密码,可在上一节中管理端添加
- 创建消息生产者
import org.springframework.amqp.core.AmqpTemplate @Component public class Sender { // 定义了一套针对AMQP协议的基础操作 @Autowired private AmqpTemplate rabbitTemplate; public void send(){ String context = "hello " + new Date(); System.out.println("Sender: " + context); // 产生一个字符串,并发送到名为hello的队列中 this.rabbitTemplate.convertAndSend("hello", context); } }
- 创建消息消费者,实现对消息队列的消费
@Component // 定义对hello队列的监听 @RabbitListener(queues="hello") public class Receiver { // 指定对消息的处理方法 @RabbitHandler public void precess(String hello){ System.out.println("Receiver: " + hello); } }
- 创建配置类,用来配置队列、交换器、路由等高级信息。此处仅简单实现队列配置,完成一个基本的生产消费过程
@Configuration public class RabbitConfig { @Bean public Queue helloQueue(){ return new Queue("hello"); } }
- 主类
@SpringBootApplication public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); } }
- 单元测试类
@RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitMQApplication.class) public class RabbitMQApplicationTest { @Autowired private Sender sender; @Test public void hello(){ sender.send(); } }
- 启动应用主类,查看控制台
- 运行单元测试类,可以在其控制台看到Sender的信息,在主类的控制台,可以看到Receiver的信息。
以上便是生产者(消息发送)与消费者(消息接收)的示例,在整个生产消费过程中,生产与消费是一个异步操作,这也是在分布式系统中要使用消息代理的重要原因。
1.注册成功-短信提醒-》消息队列
2.用户行为日志
3.秒杀