一、RabbitMQ概述
1.1 概念
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
1.2 优势
**可靠性(Reliablity):**使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
**灵活的路由(Flexible Routing):**在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
**消息集群(Clustering):**多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
**高可用(Highly Avaliable Queues):**队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
**多种协议(Multi-protocol):**支持多种消息队列协议,如STOMP、MQTT等。
**多种语言客户端(Many Clients):**几乎支持所有常用语言,比如Java、.NET、Ruby等。
**管理界面(Management UI):**提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
**跟踪机制(Tracing):**如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
**插件机制(Plugin System):**提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。
二、RabbitMQ组件
**Broker:**标识消息队列服务器实体.
**Virtual Host:**虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
**Exchange:**交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
direct交换机:完全匹配,才会路由过去,点对点通信模式。
fanout交换机:广播类型
topic交换机:发布-订阅模式,消息交于所有订阅的交换机。
**Queue:**消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
**Banding:**绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
**Channel:**信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
**Connection:**网络连接,比如一个TCP连接。
**Publisher:**消息的生产者,也是一个向交换器发布消息的客户端应用程序。
**Consumer:**消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
**Message:**消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
三、安装RabbitMQ
3.1 docker 安装
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -
4369 25672 【Erlang发现 集群端口】
5672 5671 【AMQP端口】
15672 【web管理控制台端口】
61613 61614 【STOMP协议端口】
1883 8883 【mqtt协议端口】
开机自启动
docker update rabbitmq --restart=always
访问:http://ip:15672
账号密码均为 guest
3.2 控制台体验Direct Exchange交换机
1.系统默认的交换机
2.创建一个交换机
3.为交换机绑定队列—创建队列
4.绑定交换机与队列
绑定交换机与队列,添加好队列的名字与 路由键的名字
5.发消息,需要现发消息给交换机,交换机才能路由给消息队列
刷新队列,既可以看到指定的队列中用准备好了一条消息,南无如何消费这条消息呢?
需要点击这条消息队列中去,获取该条消息。
3.3 控制台体验fanout Exchange交换机
1.创建fanout exchange 交换机
这种交换机会发送至所有的队列-----
2.为其绑定多个消息队列…
创建几个消息队列,绑定
3.向其中一个消息队列的发送消息,指定其中一个路由键【广播,可以不设置路由键】。
4.查看消息队列,发现所有的队列全部收到消息。
3.3 控制台体验Topic Exchange交换机
1.创建主题交换机
2.绑定消息队列,路由键如何设置???
xxx.# 代表xxx以后匹配0个或者多个
*.xxx 代表匹配以.xxx结尾的消息队列消息路由键
3.测试【略】
四、Springboot整合RabbitMQ
4.1 引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
自动生效,无需配置。IOC容器中会放入很多组件,如RabbitTemplate AmqpAdmin RabbitMessingTemplate连接工厂等等。
4.2 配置文件
spring.rabbitmq.host=192.168.52.10 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
4.3 启动类开启注解
@EnableRabbit
4.4 AmqpAdmin创建Exchange/Queue/Binding
@Autowired AmqpAdmin amqpAdmin; /* * 创建交换机 */ @Test public void createExchange(){ // 名字,是否持久化,自动删除 DirectExchage directExchange = new DirectExchage("hello-java-exchage",true,false); amqp.declareExchange(directExchange); } /* * 创建队列 */ @Test public void createQueue(){ //名字 ,持久化,排他,自动删除 Queue queue = new Queue("hello-java-queue",true,false,false); amqp.declareQueue(); } /* * 绑定交换机与队列 */ @Test public void createBinding(){ //目的地,目的地类型,交换机,路由键,参数 Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null); amqp.declareBinding(binding); }
4.5 RabbitTemplate发送消息
@Autowired RabbitTemplate rabbitTemplate; /* * 发送消息 */ @Test public void sendMessage(){ // 交换机,路由键,消息内容【Object类型可以发送对象,但是java序列化了,对象必须序列化】 rabbitTemplate.converAndSend("hello-java-exchage","hello.java","消息体"); } 为了能够用json类型的展示,需要配以一个序列化器放入IOC容器中 @Configuration public class MyRabbitConfig{ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
4.6 @RabbitListener监听接受消息
// 必须放到组建中 方法中的类 @RabbitListener(queues = {"hello-java-queue"}) public void receiveMessage(Message message,实体类 xxx){ // 1.第一种接受消息 【麻烦】 byte[] body = message.getBody(); JSON.parse(body); // 2.第二种接收消息 直接传入实体类 System.out.println(message); }