rabbitmq-sharding插件使用

简介: rabbitmq-sharding插件使用

RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?


问题


RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?


答案是集群并不能的增加单个队列的吞吐量,这是因为RabbitMQ的普通集群只是共享元数据信息,达到将整个集群规模的队列扩大以增加吞吐量的目的。普通集群甚至不能保证消息数据的高可用,任意一个broker宕机,都会导致这个broker上的队列不可用。


而镜像队列也仅仅只是保证了实现镜像复制的队列的高可用。消费者并不能并发消费复制出来的队列。


那么RabbitMQ是否也能提高类似Kafka的topic分区的机制,来加大单个主题队列的吞吐量呢?


通过使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 来更加灵活地动态均衡队列压力,可以更从容地达到百万并发的性能。


这里我重点介绍下,RabbitMQ Sharding 插件,有兴趣的伙伴可以自己研究下Consistent-hash Sharding Exchange,两者的基本思路一致,都是根据Routeing Key的hash值将消息分发到分片队列中。


原理介绍


官网:https://github.com/rabbitmq/rabbitmq-sharding


rabbitmq sharding插件为您自动对队列进行分区,也就是说,一旦您将一个exchange 定义为sharded,那么在每个集群节点上自动创建支持队列,并在它们之间共享消息。rabbitmq sharding向使用者显示了一个队列,但它可能是后台运行在它后面的多个队列。rabbitmq sharding插件为您提供了一个集中的位置,通过向集群中的其他节点添加队列,您可以将消息以及跨多个节点的负载平衡发送到该位置。

50.png




插件安装


查看当前插件

find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list

51.png


如果有没有对应的插件,自己下载后复制插件到指定的目录

手动下载安装,https://www.rabbitmq.com/community-plugins/

52.png


RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

53.png


插件安装完成后可以通过命令sudo rabbitmq-plugins list查看已有插件列表,eg:

54.png


指定安装插件命令:

./rabbitmq-plugins enable rabbitmq_sharding


说明安装成功,重启生效:

service rabbitmq-server restart


队列分片


1.配置策略


find / -name rabbitmqctl
cd /usr/sbin/
./rabbitmqctl set_policy history-shard    "^history" \
  '{"shards-per-node": 2, "routing-key": "1234"}' \
  --apply-to exchanges

说明:

通过rabbitmqctl set_policy设置新增策略,策略名称为history-shard,

匹配规则为^history,shards-per-node表示每个节点上分片出2个分片队列,

routing-key为1234,应用到所有交换器exchanges上去匹配执行。


2、在管理界面,手动创建名称为history的交换器,交换器类型选择x-consistent-hash


55.png

56.png

57.png



如上图现实,说明我们的消息分片已经成功。


但是,我们怎么去消费history交换器下的消息呢?


代码实战


1、引入rabbitmq的依赖包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、生产者


public class ProductTest {
    private static final String EXCHANGE_NAME = "history";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("wanli");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
        for (int i = 0; i < 10000; i++) {
            //第一个参数是交换器名称,第二个参数是routeing key ,注意这里的routeing key一定要是随机的,不然消息都会发送到同一个队列中
            channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
        }
        channel.waitForConfirmsOrDie(10000);
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}


消费者向名称为history的交换器,发送10000条routeing key为0~10000,内容为hello的消息

58.png


3、消费者


public class ConsumerTest {
    private static final String QUEUE_NAME = "history";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("wanli");
        factory.setPort(5672);
         Connection connection = factory.newConnection();
         Channel channel = connection.createChannel();
        //每次抓取的消息数量
        channel.basicQos(32);
        for (int i = 0; i < 10; i++) {
            Consumer consumer = new MyConsumer(channel);
            channel.basicConsume(QUEUE_NAME,consumer);
        }
        TimeUnit.SECONDS.sleep(120);
        channel.close();
        connection.close();
    }
    private  static class MyConsumer extends DefaultConsumer{
        public MyConsumer(Channel channel) {
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收到的消息为:" + new String(body));
            super.getChannel().basicAck(envelope.getDeliveryTag(),false);
        }
    }
}


10个消费者去消费名称为history的队列,消费者均匀分布在每个队列上(每个队列上绑定了5个),每次抓取32个消息去消费。

59.png


总结


通过rabbitmq-sharding插件,将原本单个队列history的分成了2个队列,但是对消费者来说,还是消费的原来的history队列,而不用管底层实际对应的物理队列。

极大的提高了单个队列在大并发下的吞吐量。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 监控 物联网
MQTT的奇妙之旅:探索RabbitMQ Web MQTT插件的威力【RabbitMQ 十一】
MQTT的奇妙之旅:探索RabbitMQ Web MQTT插件的威力【RabbitMQ 十一】
194 0
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
464 0
|
5月前
|
消息中间件 存储 运维
RabbitMQ插件详解:rabbitmq_message_timestamp【Rabbitmq 五】
RabbitMQ插件详解:rabbitmq_message_timestamp【Rabbitmq 五】
113 1
|
5月前
|
消息中间件 存储 负载均衡
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
分布式消息传递新时代:深入了解RabbitMQ_sharding插件的精髓【RabbitMQ 八】
102 0
|
5月前
|
消息中间件 监控 前端开发
RabbitMQ插件详解:rabbitmq_web_stomp【RabbitMQ 六】
RabbitMQ插件详解:rabbitmq_web_stomp【RabbitMQ 六】
276 0
|
5月前
|
消息中间件
rabbitmq插件升级-延时队列为列
rabbitmq插件升级-延时队列为列
655 15
|
5月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
803 0
|
11月前
|
消息中间件
RabbitMq没开启rabbitmq_management插件控制台报错Node statistics not available
RabbitMq没开启rabbitmq_management插件控制台报错Node statistics not available
RabbitMQ实现延迟消息居然如此简单,整个插件就完事了
RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列实现,另一种是使用延迟插件实现。死信队列实现我们以前曾经讲过这次我们讲个更简单的,使用延迟插件实现。
|
5月前
|
消息中间件 Prometheus 监控
探秘RabbitMQ:社区插件与扩展的奇妙世界【RabbitMQ 三】
探秘RabbitMQ:社区插件与扩展的奇妙世界【RabbitMQ 三】
185 0

热门文章

最新文章

下一篇
无影云桌面