RabbitMQ中真的只有四种交换器吗?

简介: RabbitMQ中真的只有四种交换器吗?

如果大家看了我之前的文章,应该都知道,rabbitmq中常用的交换器有4中,分别是:direct、fanout、topic、headers。


那么rabbitmq中,真的只有4中交换器吗?


今天和大家一起研究下。


一个简单的方式,通过启动rabbitmq_management插件,在管理控制台去尝试创建exchange。

这里说明下,我使用的3.8.3版本的rabbitmq。不同版本的rabbitmq可能存在一些查询,有些类型的exchange可能需要安装对应的插件后才能创建。

25.png


可以发现多了2种类型的exchange,分别是:x-consistent-hash和x-modulus-hash以及一个参数Alternate exchange.


常规的四种交换器类型,这里就不再赘述,有兴趣的小伙伴可以看看我整理的RabbitMQ入门知识整理


x-consistent-hash和x-modulus-hash类型的exchange,是在从 RabbitMQ 3.6.0 版本开始,整合到 RabbitMQ 发布包中的。

之前的版本中,需要自己手动下载插件去安装,具体操作这里就不展开了。


x-consistent-hash


git地址:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange

说明:

一致性hash交换器,主要是使用一致性hash算法将消息分发到绑定在交换器上的队列上。


工作原理:

当使用“一致性哈希”作为交换类型的情况下,会根据消息属性(最常见的是路由密钥 routing key)计算一个hash值,然后根据这个hash值,将消息分发到绑定在该交换器下的队列中。

因此,如果没有发生绑定更改,具有相同路由关键字的消息将具有相同哈希值将被路由到同一队列。


下面是工作原理图:

24.png

权重


当队列绑定到Consistent Hash交换时,绑定密钥 binding key会使用一个数字字符串,表示绑定权重:这个桶的数量将与目标队列关联(范围的部分)。

简单的说,就是绑定键的数字越大,那么绑定的队列的权重就越大,分发消息的时候接受到的消息就越多。


请注意,绑定中的routing_keys是数字字符串。 这是因为AMQP 0-9-1指定routing_key字段必须为字符串。


另外,发布消息的时候,路由键routing_key一定要是随机的。


绑定键binding key决定队列的权重

路由键routing_key决定消息的分发


x-modulus-hash


git地址:https://github.com/rabbitmq/rabbitmq-sharding

x-modulus-hash路由器对应之前的rabbitmq sharding插件,主要是实现自动对队列进行分区。

也就是说,一旦将一个exchange 定义为x-modulus-hash,就可以在每个集群节点上自动创建支持队列,并在它们之间共享消息。rabbitmq sharding向使用者显示了一个队列,但可能在后台运行了它对应的多个队列。rabbitmq sharding插件为您提供了一个集中的位置,通过向集群中的其他节点添加队列,您可以将消息以及跨多个节点的负载平衡发送到该位置。


工作原理图:

23.png


主要特征:新加节点后,新加自动分片

该插件的主要特性之一是,当将新节点添加到RabbitMQ集群时,该插件将在新节点上自动创建更多分片。假设节点a上有一个带有4个队列的分片,而节点b刚加入集群。该插件将在节点b上自动创建4个队列,并将它们“加入”分片分区。已经传递的消息将不会重新平衡,但是新到达的消息将被划分到新队列中。


两种路由器的选用:

如果只需要消息分区,而不需要自动调整分片数量的话,可以使用Consistent Hash Exchange;反之,如果需要根据策略或节点数量,动态调整分片数量的话,则选择x-modulus-hash。


备份交换器Alternate Exchange


生产者在发送消息时不设置mandatory 参数,那么消息达到路由器后匹配不到相应的队列后消息将会丢失。

设置了mandatory 参数,那么需要添加ReturnListener的编程逻辑。

如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器。


顾名思义 备份交换器就是当第一个交换器未能有效匹配到队列时,路由到备份交换器,再由备份交换器区匹配队列


模型图

从模型图中可以看到消息发送到名字为TestAE的路由器中,但是因为没有跟队列匹配,这个时候消息就会被发送到名字为exchange-unroute的备份交换器,这个交换器一般会为fanout型,随后就会被路由到AE-queue队列

22.png


核心代码:

//存储备份交换器的参数map
Map<String, Object> spare = new HashMap<String , Object>(2);
spare.put("alternate-exchange" , MY_SPARE);
//声明了一个direct 类型的交换器,并且添加存储备份交换器的map参数
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);


完成示例代码:

public class SendSpare {
    private final static String QUEUE_NAME = "wsd_test";
    private final static String QUEUE_NAME_2 = "wsd_test2";
    private final static String EXCHANGE_NAME = "wsd_exchange";
    private final static String ROUTING_KEY = "wsd_exchange";
    private final static String EXCHANGE_KEY = "wsd_exchange";
    private final static String MY_SPARE = "mySpare";
    private static Connection connection =null;
    private static Channel channel = null;
    public static void main(String[] args) {
        Map<String, Object> spare = new HashMap<String , Object>(2);
        spare.put("alternate-exchange" , MY_SPARE);
        try{
            // 获取到连接以及mq通道
            connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
           channel = connection.createChannel();
           //声明了一个direct 类型的交换器
           channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);
           //声明一个备胎路由器
            channel.exchangeDeclare(MY_SPARE,"fanout",true,false,null);
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将路由与队列绑定,再为绑定的路径赋值一个绑定键
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
            //绑定备胎路由器
            channel.queueBind(QUEUE_NAME_2,MY_SPARE,"");
            //发送数据
            for (int i=0;i<10;i++){
                // 消息内容
                String message = "Hello World!"+i;
                //指定发送消息到哪个路由,以及他的路由键,消息等
                if (i%2==0){
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes());
                }else {
                    //匹配不到队列
                    channel.basicPublish(EXCHANGE_NAME, "kkkk",null, message.getBytes());
                }
                System.out.println(" [x] Sent '" + message + "'");
                Thread.sleep(200);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //关闭通道和连接
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}


联邦交换器 Federation


Federation插件的高级目标是在代理之间传输消息而无需集群。


该插件可以让交换器和队列组成同盟。 一个联邦的交换或队列可以从一个或多个上游(远程交换和其他代理上的队列)接收消息,可以将上游发布的消息路由到本地队列。 一个联邦队列可以让一个本地消费者从上游队列接收消息。


rabbitmq-plugins  enable  rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management


21.png


工作原理:

20.png


其他


通过执行rabbitmq-plugins list命令,查看支持的默认带有哪些插件

19.png

rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange


依次通过命令安装对应插件后:


rabbitmq-plugins  enable  rabbitmq_event_exchange



18.png

进一步研究,还会不会有其他一些插件呢?

去官网的插件网址:

https://www.rabbitmq.com/community-plugins.html

17.png

rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange 延迟消息插件
rabbitmq_management_exchange
pgsql_listen_exchange

总结


3.8.3版本的rabbitmq,默认有6种类型的交换器类型,分别是:

direct、fanout、topic、headers,x-consistent-hash和x-modulus-hash,一个参数Alternate exchange来实现备份交换器。


通过插件扩展,还可以声明如下交换器:

rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange
rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange 延迟消息插件
rabbitmq_management_exchange
pgsql_listen_exchange
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器
|
消息中间件 Java 网络架构
RabbitMQ系列(三)RabbitMQ交换器Exchange介绍与实践
RabbitMQ交换器Exchange介绍与实践 RabbitMQ系列文章 RabbitMQ在Ubuntu上的环境搭建 深入了解RabbitMQ工作原理及简单使用 RabbitMQ交换器Exchange介绍与实践 RabbitMQ事务和Confirm发送方消息确认——深入解读 使用Docker部署RabbitMQ集群 你不知道的RabbitMQ集群架构全解 导读 有了Rabbit的基础知识之后(基础知识详见:深入解读RabbitMQ工作原理及简单使用),本章我们重点学习一下Rabbit里面的exchange(交换器)的知识。
1451 0
|
9月前
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
142 0
|
9月前
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
88 0
|
9月前
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
323 0
|
9月前
|
消息中间件 存储 网络协议
Rabbitmq的安装与使用
Rabbitmq的安装与使用
228 0
|
10月前
|
消息中间件 数据安全/隐私保护 Windows
【MQ】Windows上RabbitMQ的安装与启动
【MQ】Windows上RabbitMQ的安装与启动
256 0