目录
前言
来了解RabbitMQ一个重要的概念:Exchange交换机
为什么我们需要 Exchange 而不是直接将消息发送至队列呢?
AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。
在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。
1. Exchange概念
Exchange:接收消息,并根据路由键转发消息所绑定的队列。
Exchange
- 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
- 黄色框:交换机和队列通过路由键有一个绑定的关系。
- 绿色框:消费端通过监听队列来接收消息。
Virtual Host(虚拟主机)
RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离, 可以认为每个虚拟主机是一个独立的命名空间, 拥有独立的队列、交换器和绑定关系。
用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。
默认的虚拟主机名为
/
。
Routing Key(路由键)
消息的一个属性,可以看作是消息的类型(根据业务自定义),比如将程序不同级别的日志作为消息发送时,error级别的消息就可以使用 log.error 作为 routing key。
Binding Key(绑定键)
Queue 与 Exchang 绑定时的一个属性,可以看作 Queue 对哪种类型的业务消息感兴趣, Exchange会根据消息的 routing key 和 binding key 决定是否将该消息转发给一个 Queue。
2. 交换机属性
交换机属性
- Name:交换机名称
- Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
- Durability:是否需要持久化,true为持久化
交换机属性
- Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
- Arguments:扩展参数,用于扩展AMQP协议自定制化使用
3. Direct Exchange(直连)
Direct Exchange: Direct Exchange 会对消息的 routing key 和 Queue 绑定到 Exchange 的 binding key 比对, 将消息转发给完全匹配(等值)的 Queue 。即 Queue 的 binding key = routing key 。
如下图,当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。
一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,如下图。此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。
Direct Exchange(直连)
- 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
- 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
Direct Exchange(直连)
- 重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。
3.1 Direct Exchange(直连)代码演示
我们来看下大概步骤:
- ConnectionFacorty:获取连接工厂
- Connection:一个连接
- Channel:数据通信信道,可发送和接收消息
- Queue:具体的消息存储队列
- Producer & Consumer 生产者和消费者
- 这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
- Channel是我们RabbitMQ所有消息进行交互的关键。
生产端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct111"; //5 发送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消费端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
queueDeclare 说明
channel.queueDeclare(queueName, true, false, false, null);
- 第一个参数:queuename:队列的名称
- 第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
- 第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
- 第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除
- 第五个参数:arguments:扩展参数
测试结果:
注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。
4. Topic Exchange
Topic Exchange: Topic Exchange 运行将 routing key 和 binding key 进行通配符匹配。
routing key 和 binding key 由多个单词使用
.
进行连接
- BindingKey 支持两个特殊符号:
#
和*
。其中*
用于匹配一个单词,#
用于匹配零个或者多个单词。以下是官方文档中的示例,交换器与队列的绑定情况如图所示,此时的路由情况如下:
- 路由键为
lazy.orange.elephant
的消息会发送给所有队列;- 路由键为
quick.orange.fox
的消息只会发送给 Q1 队列;- 路由键为
lazy.brown.fox
的消息只会发送给 Q2 队列;- 路由键为
lazy.pink.rabbit
的消息只会发送给 Q2 队列;- 路由键为
quick.brown.fox
的消息与任何绑定都不匹配;- 路由键为
orange
或quick.orange.male.rabbit
的消息也与任何绑定都不匹配。
Topic Exchange
- 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
- Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
Topic Exchange
Topic Exchange
在一堆消息中,每个不同的队列只关心自己需要的消息。
4.1 Topic Exchange代码演示
Topic Exchange生产端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 发送 String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
Topic Exchange消费端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; //String routingKey = "user.*"; String routingKey = "user.*"; // 1 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交换机和队列的绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
Topic Exchange测试结果:
注意一个问题:需要进行解绑
5. Fanout Exchange
Fanout Exchange: Fanout Exchange 是消息广播的模式, 不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列</上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。
Fanout Exchange
- 不处理路由键,只需要简单的将队里绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的
5.1 Fanout Exchange代码演示
Fanout Exchange生产端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; //5 发送 for(int i = 0; i < 10; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
Fanout Exchange消费端:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不设置路由键 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
6. Exchange交换机其他属性
6.1 Bingding —— 绑定
标题
- Exchange和Exchange、Queue之间的连接关系
- Bingding可以包含RoutingKey或者参数
6.2 Queue——消息队列
Queue——消息队列
- 消息队列,实际存储消息数据
- Durability:是否持久化,Durable:是 ,Transient:否
- Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。
6.3 Message——消息
Message——消息
- 服务器与应用程序之间传送的数据
- 本质上就是一段数据,由Properties和Payload(Body)组成
- 常用属性:delivery mode、headers(自定义属性)
6.4 其他属性
- content_type、content_encoding、priority
- correlation_id、reply_to、expiration、message_id
- timestamp、type、user_id、app_id、cluster_id
6.5 Virtual Host虚拟主机
Virtual Host虚拟主机
- 虚拟地址,用于进行逻辑隔离,最上层的消息路由
- 一个Virtual Host里面可以有若干个Exchange和Queue
- 同一个Virtual Host里面不能有相同名称的Exchange或Queue