RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

简介: RabbitMQ精讲3:Exchange交换机类型-direct、topic、fanout

目录

前言

1. Exchange概念

2. 交换机属性

3. Direct Exchange(直连)

3.1 Direct Exchange(直连)代码演示

生产端:

消费端:

queueDeclare 说明

测试结果:

4. Topic Exchange

4.1 Topic Exchange代码演示

Topic Exchange生产端:

Topic Exchange消费端:

Topic Exchange测试结果:

5. Fanout Exchange

5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

Fanout Exchange消费端:

6. Exchange交换机其他属性

6.1 Bingding —— 绑定

6.2 Queue——消息队列

6.3 Message——消息

6.4 其他属性

6.5 Virtual Host虚拟主机



前言

来了解RabbitMQ一个重要的概念:Exchange交换机

为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。

 

在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度

 


1. Exchange概念

Exchange:接收消息,并根据路由键转发消息所绑定的队列。

Exchange

  • 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
  • 黄色框:交换机和队列通过路由键有一个绑定的关系。
  • 绿色框:消费端通过监听队列来接收消息。


Virtual Host(虚拟主机)

RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离, 可以认为每个虚拟主机是一个独立的命名空间, 拥有独立的队列、交换器和绑定关系。

用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。

默认的虚拟主机名为 /


Routing Key(路由键)

消息的一个属性,可以看作是消息的类型(根据业务自定义),比如将程序不同级别的日志作为消息发送时,error级别的消息就可以使用 log.error 作为 routing key。


Binding Key(绑定键)

Queue 与 Exchang 绑定时的一个属性,可以看作 Queue 对哪种类型的业务消息感兴趣, Exchange会根据消息的 routing key 和 binding key 决定是否将该消息转发给一个 Queue。


2. 交换机属性

交换机属性

  • Name:交换机名称
  • Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
  • Durability:是否需要持久化,true为持久化

交换机属性

  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议自定制化使用


3. Direct Exchange(直连)

Direct Exchange: Direct Exchange 会对消息的 routing key 和 Queue 绑定到 Exchange 的 binding key 比对, 将消息转发给完全匹配(等值)的 Queue 。即 Queue 的 binding key = routing key 。

如下图,当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。

一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,如下图。此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。

Direct Exchange(直连)

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
  • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

Direct Exchange(直连)

  • 重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。


3.1 Direct Exchange(直连)代码演示

我们来看下大概步骤:

  • ConnectionFacorty:获取连接工厂
  • Connection:一个连接
  • Channel:数据通信信道,可发送和接收消息
  • Queue:具体的消息存储队列
  • Producer & Consumer 生产者和消费者
  • 这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
  • Channel是我们RabbitMQ所有消息进行交互的关键。


生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4DirectExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String routingKey = "test.direct111";
    //5 发送
    String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
    channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());     
  }
}


消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4DirectExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_direct_exchange";
    String exchangeType = "direct";
    String queueName = "test_direct_queue";
    String routingKey = "test.direct";
    //表示声明了一个交换机
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    //表示声明了一个队列
    channel.queueDeclare(queueName, false, false, false, null);
    //建立一个绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


queueDeclare 说明

channel.queueDeclare(queueName, true, false, false, null);
  • 第一个参数:queuename:队列的名称
  • 第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
  • 第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
  • 第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除
  • 第五个参数:arguments:扩展参数


测试结果:

注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。


4. Topic Exchange

Topic Exchange: Topic Exchange 运行将 routing key 和 binding key 进行通配符匹配。

routing key 和 binding key 由多个单词使用 . 进行连接

  • BindingKey 支持两个特殊符号:#* 。其中 * 用于匹配一个单词, # 用于匹配零个或者多个单词。

以下是官方文档中的示例,交换器与队列的绑定情况如图所示,此时的路由情况如下:

  • 路由键为 lazy.orange.elephant 的消息会发送给所有队列;
  • 路由键为 quick.orange.fox 的消息只会发送给 Q1 队列;
  • 路由键为 lazy.brown.fox 的消息只会发送给 Q2 队列;
  • 路由键为 lazy.pink.rabbit 的消息只会发送给 Q2 队列;
  • 路由键为 quick.brown.fox 的消息与任何绑定都不匹配;
  • 路由键为 orangequick.orange.male.rabbit 的消息也与任何绑定都不匹配。

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

Topic Exchange

Topic Exchange

在一堆消息中,每个不同的队列只关心自己需要的消息。


4.1 Topic Exchange代码演示

Topic Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String routingKey1 = "user.save";
    String routingKey2 = "user.update";
    String routingKey3 = "user.delete.abc";
    //5 发送
    String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
    channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
    channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());  
    channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
    channel.close();  
        connection.close();  
  }
}


Topic Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4TopicExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_topic_exchange";
    String exchangeType = "topic";
    String queueName = "test_topic_queue";
    //String routingKey = "user.*";
    String routingKey = "user.*";
    // 1 声明交换机 
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    // 2 声明队列
    channel.queueDeclare(queueName, false, false, false, null);
    // 3 建立交换机和队列的绑定关系:
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


Topic Exchange测试结果:

注意一个问题:需要进行解绑


5. Fanout Exchange

Fanout Exchange: Fanout Exchange 是消息广播的模式, 不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列</上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。

Fanout Exchange

  • 不处理路由键,只需要简单的将队里绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的


5.1 Fanout Exchange代码演示

Fanout Exchange生产端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 创建Connection
    Connection connection = connectionFactory.newConnection();
    //3 创建Channel
    Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    //5 发送
    for(int i = 0; i < 10; i ++) {
      String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
      channel.basicPublish(exchangeName, "", null , msg.getBytes());      
    }
    channel.close();  
        connection.close();  
  }
}


Fanout Exchange消费端:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer4FanoutExchange {
  public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
    //4 声明
    String exchangeName = "test_fanout_exchange";
    String exchangeType = "fanout";
    String queueName = "test_fanout_queue";
    String routingKey = ""; //不设置路由键
    channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
  }
}


6. Exchange交换机其他属性

6.1 Bingding —— 绑定

标题

  • Exchange和Exchange、Queue之间的连接关系
  • Bingding可以包含RoutingKey或者参数


6.2 Queue——消息队列

Queue——消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。


6.3 Message——消息

Message——消息

  • 服务器与应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)


6.4 其他属性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • timestamp、type、user_id、app_id、cluster_id


6.5 Virtual Host虚拟主机

Virtual Host虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
238 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
5月前
|
消息中间件 Kubernetes RocketMQ
消息队列 MQ产品使用合集之topic是怎么选择分布在哪里brocker上面的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
116 2
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
72 2
|
4月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 测试技术 Apache
消息队列 MQ产品使用合集之在测试环境中拥有大量的topic会有什么影响
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
114 1
|
5月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ操作报错合集之无法自动创建topic,该怎么办
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
186 0
|
28天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
68 5
下一篇
无影云桌面