RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

简介: RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

目录

前言

1. Exchange概念

2. 交换机属性

3. Direct Exchange(直连)

3.1 Direct Exchange(直连)代码演示

生产端:

消费端:

queueDeclare 说明

测试结果:

4. Topic Exchange

4.1 Topic Exchange代码演示

Topic Exchange生产端:

Topic Exchange消费端:

Topic Exchange测试结果:

5. Fanout Exchange

5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

Fanout Exchange消费端:

6. Exchange交换机其他属性

6.1 Bingding —— 绑定

6.2 Queue——消息队列

6.3 Message——消息

6.4 其他属性

6.5 Virtual Host虚拟主机



前言

来了解RabbitMQ一个重要的概念:Exchange交换机

为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。

 

在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度

 


1. Exchange概念

Exchange:接收消息,并根据路由键转发消息所绑定的队列。

Exchange

  • 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
  • 黄色框:交换机和队列通过路由键有一个绑定的关系。
  • 绿色框:消费端通过监听队列来接收消息。


Virtual Host(虚拟主机)

RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离, 可以认为每个虚拟主机是一个独立的命名空间, 拥有独立的队列、交换器和绑定关系。

用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。

默认的虚拟主机名为 /


Routing Key(路由键)

消息的一个属性,可以看作是消息的类型(根据业务自定义),比如将程序不同级别的日志作为消息发送时,error级别的消息就可以使用 log.error 作为 routing key。


Binding Key(绑定键)

Queue 与 Exchang 绑定时的一个属性,可以看作 Queue 对哪种类型的业务消息感兴趣, Exchange会根据消息的 routing key 和 binding key 决定是否将该消息转发给一个 Queue。


2. 交换机属性

交换机属性

  • Name:交换机名称
  • Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
  • Durability:是否需要持久化,true为持久化

交换机属性

  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议自定制化使用


3. Direct Exchange(直连)

Direct Exchange: Direct Exchange 会对消息的 routing key 和 Queue 绑定到 Exchange 的 binding key 比对, 将消息转发给完全匹配(等值)的 Queue 。即 Queue 的 binding key = routing key 。

如下图,当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。

一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,如下图。此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。

Direct Exchange(直连)

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
  • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

Direct Exchange(直连)

  • 重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。


3.1 Direct Exchange(直连)代码演示

我们来看下大概步骤:

  • ConnectionFacorty:获取连接工厂
  • Connection:一个连接
  • Channel:数据通信信道,可发送和接收消息
  • Queue:具体的消息存储队列
  • Producer & Consumer 生产者和消费者
  • 这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
  • Channel是我们RabbitMQ所有消息进行交互的关键。


生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4DirectExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String routingKey = "test.direct111";
    //5 发送
    String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
    channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());     
  }
}


消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4DirectExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String exchangeType = "direct";
    String queueName = "test_direct_queue";
    String routingKey = "test.direct";
    //表示声明了一个交换机
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    //表示声明了一个队列
    channel.queueDeclare(queueName, false, false, false, null);
    //建立一个绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


queueDeclare 说明

channel.queueDeclare(queueName, true, false, false, null);
  • 第一个参数:queuename:队列的名称
  • 第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
  • 第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
  • 第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除
  • 第五个参数:arguments:扩展参数


测试结果:

注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。


4. Topic Exchange

Topic Exchange: Topic Exchange 运行将 routing key 和 binding key 进行通配符匹配。

routing key 和 binding key 由多个单词使用 . 进行连接

  • BindingKey 支持两个特殊符号:#* 。其中 * 用于匹配一个单词, # 用于匹配零个或者多个单词。

以下是官方文档中的示例,交换器与队列的绑定情况如图所示,此时的路由情况如下:

  • 路由键为 lazy.orange.elephant 的消息会发送给所有队列;
  • 路由键为 quick.orange.fox 的消息只会发送给 Q1 队列;
  • 路由键为 lazy.brown.fox 的消息只会发送给 Q2 队列;
  • 路由键为 lazy.pink.rabbit 的消息只会发送给 Q2 队列;
  • 路由键为 quick.brown.fox 的消息与任何绑定都不匹配;
  • 路由键为 orangequick.orange.male.rabbit 的消息也与任何绑定都不匹配。

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

Topic Exchange

Topic Exchange

在一堆消息中,每个不同的队列只关心自己需要的消息。


4.1 Topic Exchange代码演示

Topic Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String routingKey1 = "user.save";
    String routingKey2 = "user.update";
    String routingKey3 = "user.delete.abc";
    //5 发送
    String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
    channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
    channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());  
    channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
    channel.close();  
        connection.close();  
  }
}


Topic Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4TopicExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String exchangeType = "topic";
    String queueName = "test_topic_queue";
    //String routingKey = "user.*";
    String routingKey = "user.*";
    // 1 声明交换机 
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    // 2 声明队列
    channel.queueDeclare(queueName, false, false, false, null);
    // 3 建立交换机和队列的绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


Topic Exchange测试结果:

注意一个问题:需要进行解绑


5. Fanout Exchange

Fanout Exchange: Fanout Exchange 是消息广播的模式, 不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列</上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。

Fanout Exchange

  • 不处理路由键,只需要简单的将队里绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的


5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    //5 发送
    for(int i = 0; i < 10; i ++) {
      String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
      channel.basicPublish(exchangeName, "", null , msg.getBytes());      
    }
    channel.close();  
        connection.close();  
  }
}


Fanout Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4FanoutExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    String exchangeType = "fanout";
    String queueName = "test_fanout_queue";
    String routingKey = ""; //不设置路由键
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


6. Exchange交换机其他属性

6.1 Bingding —— 绑定

标题

  • Exchange和Exchange、Queue之间的连接关系
  • Bingding可以包含RoutingKey或者参数


6.2 Queue——消息队列

Queue——消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。


6.3 Message——消息

Message——消息

  • 服务器与应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)


6.4 其他属性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • timestamp、type、user_id、app_id、cluster_id


6.5 Virtual Host虚拟主机

Virtual Host虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
|
5天前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
13 1
|
5月前
|
消息中间件 存储
【RabbitMQ教程】第四章 —— RabbitMQ - 交换机(上)
【RabbitMQ教程】第四章 —— RabbitMQ - 交换机
|
2月前
|
消息中间件 物联网 网络性能优化
MQTT常见问题之MQTT的topic超出上限25个如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
JSON 网络协议 物联网
MQTT协议问题之消息类型分类如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
49 3
|
3月前
|
消息中间件 算法 微服务
升华 RabbitMQ:解锁一致性哈希交换机的奥秘【RabbitMQ 十】
升华 RabbitMQ:解锁一致性哈希交换机的奥秘【RabbitMQ 十】
35 0
|
4月前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
4月前
|
消息中间件 Java
RabbitMQ中的Exchange是什么?它有哪些类型?
RabbitMQ中的Exchange是什么?它有哪些类型?
34 0
|
4月前
|
消息中间件 存储
RabbitMQ之交换机
【1月更文挑战第9天】 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
132 1