RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

简介: RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

目录

前言

1. Exchange概念

2. 交换机属性

3. Direct Exchange(直连)

3.1 Direct Exchange(直连)代码演示

生产端:

消费端:

queueDeclare 说明

测试结果:

4. Topic Exchange

4.1 Topic Exchange代码演示

Topic Exchange生产端:

Topic Exchange消费端:

Topic Exchange测试结果:

5. Fanout Exchange

5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

Fanout Exchange消费端:

6. Exchange交换机其他属性

6.1 Bingding —— 绑定

6.2 Queue——消息队列

6.3 Message——消息

6.4 其他属性

6.5 Virtual Host虚拟主机



前言

来了解RabbitMQ一个重要的概念:Exchange交换机

为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。

 

在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度

 


1. Exchange概念

Exchange:接收消息,并根据路由键转发消息所绑定的队列。

Exchange

  • 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
  • 黄色框:交换机和队列通过路由键有一个绑定的关系。
  • 绿色框:消费端通过监听队列来接收消息。


Virtual Host(虚拟主机)

RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离, 可以认为每个虚拟主机是一个独立的命名空间, 拥有独立的队列、交换器和绑定关系。

用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。

默认的虚拟主机名为 /


Routing Key(路由键)

消息的一个属性,可以看作是消息的类型(根据业务自定义),比如将程序不同级别的日志作为消息发送时,error级别的消息就可以使用 log.error 作为 routing key。


Binding Key(绑定键)

Queue 与 Exchang 绑定时的一个属性,可以看作 Queue 对哪种类型的业务消息感兴趣, Exchange会根据消息的 routing key 和 binding key 决定是否将该消息转发给一个 Queue。


2. 交换机属性

交换机属性

  • Name:交换机名称
  • Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
  • Durability:是否需要持久化,true为持久化

交换机属性

  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议自定制化使用


3. Direct Exchange(直连)

Direct Exchange: Direct Exchange 会对消息的 routing key 和 Queue 绑定到 Exchange 的 binding key 比对, 将消息转发给完全匹配(等值)的 Queue 。即 Queue 的 binding key = routing key 。

如下图,当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。

一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,如下图。此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。

Direct Exchange(直连)

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
  • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

Direct Exchange(直连)

  • 重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。


3.1 Direct Exchange(直连)代码演示

我们来看下大概步骤:

  • ConnectionFacorty:获取连接工厂
  • Connection:一个连接
  • Channel:数据通信信道,可发送和接收消息
  • Queue:具体的消息存储队列
  • Producer & Consumer 生产者和消费者
  • 这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
  • Channel是我们RabbitMQ所有消息进行交互的关键。


生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4DirectExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String routingKey = "test.direct111";
    //5 发送
    String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
    channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());     
  }
}


消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4DirectExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String exchangeType = "direct";
    String queueName = "test_direct_queue";
    String routingKey = "test.direct";
    //表示声明了一个交换机
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    //表示声明了一个队列
    channel.queueDeclare(queueName, false, false, false, null);
    //建立一个绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


queueDeclare 说明

channel.queueDeclare(queueName, true, false, false, null);
  • 第一个参数:queuename:队列的名称
  • 第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
  • 第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
  • 第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除
  • 第五个参数:arguments:扩展参数


测试结果:

注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。


4. Topic Exchange

Topic Exchange: Topic Exchange 运行将 routing key 和 binding key 进行通配符匹配。

routing key 和 binding key 由多个单词使用 . 进行连接

  • BindingKey 支持两个特殊符号:#* 。其中 * 用于匹配一个单词, # 用于匹配零个或者多个单词。

以下是官方文档中的示例,交换器与队列的绑定情况如图所示,此时的路由情况如下:

  • 路由键为 lazy.orange.elephant 的消息会发送给所有队列;
  • 路由键为 quick.orange.fox 的消息只会发送给 Q1 队列;
  • 路由键为 lazy.brown.fox 的消息只会发送给 Q2 队列;
  • 路由键为 lazy.pink.rabbit 的消息只会发送给 Q2 队列;
  • 路由键为 quick.brown.fox 的消息与任何绑定都不匹配;
  • 路由键为 orangequick.orange.male.rabbit 的消息也与任何绑定都不匹配。

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

Topic Exchange

Topic Exchange

在一堆消息中,每个不同的队列只关心自己需要的消息。


4.1 Topic Exchange代码演示

Topic Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String routingKey1 = "user.save";
    String routingKey2 = "user.update";
    String routingKey3 = "user.delete.abc";
    //5 发送
    String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
    channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
    channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());  
    channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
    channel.close();  
        connection.close();  
  }
}


Topic Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4TopicExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String exchangeType = "topic";
    String queueName = "test_topic_queue";
    //String routingKey = "user.*";
    String routingKey = "user.*";
    // 1 声明交换机 
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    // 2 声明队列
    channel.queueDeclare(queueName, false, false, false, null);
    // 3 建立交换机和队列的绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


Topic Exchange测试结果:

注意一个问题:需要进行解绑


5. Fanout Exchange

Fanout Exchange: Fanout Exchange 是消息广播的模式, 不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列</上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。

Fanout Exchange

  • 不处理路由键,只需要简单的将队里绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的


5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    //5 发送
    for(int i = 0; i < 10; i ++) {
      String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
      channel.basicPublish(exchangeName, "", null , msg.getBytes());      
    }
    channel.close();  
        connection.close();  
  }
}


Fanout Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4FanoutExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    String exchangeType = "fanout";
    String queueName = "test_fanout_queue";
    String routingKey = ""; //不设置路由键
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


6. Exchange交换机其他属性

6.1 Bingding —— 绑定

标题

  • Exchange和Exchange、Queue之间的连接关系
  • Bingding可以包含RoutingKey或者参数


6.2 Queue——消息队列

Queue——消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。


6.3 Message——消息

Message——消息

  • 服务器与应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)


6.4 其他属性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • timestamp、type、user_id、app_id、cluster_id


6.5 Virtual Host虚拟主机

Virtual Host虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
172 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
64 0
|
2月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
50 0
|
5月前
|
消息中间件
02.交换机RabbitMQ交换机
02.交换机RabbitMQ交换机
61 0
|
6月前
|
网络性能优化 网络虚拟化 网络架构
配置接口限速示例(盒式交换机)
接口限速简介 接口限速对通过整个端口的全部报文流量速率进行限制,不对具体流量进行区分,可以实现给某个接口分配固定的带宽,控制方式单一,配置简单。 入方向与出方向的接口限速属于并列关系,用户可以根据需要同时配置,也可以单独配置。
|
18天前
|
安全 网络安全 数据安全/隐私保护
Cisco-交换机配置聚合端口
Cisco-交换机配置聚合端口
|
4月前
|
网络安全 数据安全/隐私保护 网络虚拟化
|
3月前
|
数据中心
配置案例 | CE交换机如何配置堆叠?
配置案例 | CE交换机如何配置堆叠?
|
3月前
盒式交换机又是如何配置堆叠的呢?
盒式交换机又是如何配置堆叠的呢?
|
3月前
|
前端开发 数据中心
数据中心框式交换机如何配置堆叠?
数据中心框式交换机如何配置堆叠?

热门文章

最新文章