RabiitMQ的五种模式

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabiitMQ的五种模式

1.simple简单模式

一个生产者一个消费者

无需交换机,RabbitMQ会通过默认的交换机将消息投递到指定的队列,这是一种Direct类型的交换机,队列与它绑定时的binding key就是队列的名称

2.work模式

一个生产者,多个消费者

多个消费者监听同一个队列

消费者1:

public class Recv {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        //channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态,注释掉表示使用自动确认模式
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消费者2:

public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        //channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
            //下面这行注释掉表示使用自动确认模式
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

生产者,生产100条消息

public class Send {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}

结果:一个消息只能被消费一次,Recv1消费的是基数消息,Recv2消费的是偶数消息

存在问题:

Recv1线程停顿的时间短,应该消费更多的消息,RabbitMQ默认将消息顺序发送给下一个消费者,这样,每个消费者消费的消息都是一样的,轮询分发消息

如何按照消费者的能力分配消息:联合使用Qos和Acknowledge,basicQos方法设置了当前信道最大欲获取(prefetch)消息数量为1,消息从队列异步推送给消费者,消费者的ack也是异步发送给队列,从队列的角度看,总由一批消息已近推送但是未收到ack确认,Qos的prefetchCount参数就是用来限制这批未确认消息数量的,设置为1的时候,队列只有再消费者发回的上一条消息ack确认之后才会继续推送消息,prefetchCount的默认值是0,所以会一直推送消息

轮询分发

默认情况下,RabbitMQ将逐个发送消息给在序列的下一个消费者,不考虑任务的时长,一次性分配,一个一个按序分配

公平分发

轮询的时候,可能所有的奇数都是很复杂的消息,偶数是简单的消息,这样对消费者的待遇不公平,引入公平分发,使用BasicQos(prefetchCount = 1)方法,限制队列只发一条消息给同一个消费者,只有收到ack确认之后再发送第二次,使用公平分发,必须关闭自动应答,改为手动应当

2.1消息的确认模式

队列推送消息如何知道消息成功被消费了

模式1:自动确认,推送之后,无论是否什么结果都判定为成功消费

模式2:手动确认,推送之后,队列将消息标记为不可用状态,如果一直没有反馈,该消息一直处于不可用状态

3.Publish/Subscribe发布订阅模式

1个生产者,多个消费者

每个消费者都有自己的队列

生产者将消息发到交换机

每个队列都绑定交换机

生产者发送的消息经过交换机到达队列,一个消息可以被多个消费者消费

注意:一个消费者队列可以有多个消费者实例,但是只有一个消费者实例会消费,消费发送到交换机后消息会消失,因为交换机只能转发消息,没有存储功能

4.Routing路由模式

路由模式下,对应使用的交换机是Direct交换机,生产者发送消息时需要指定routing key(和发布订阅模式的区别),交换机会根据routing key将消息投递到指定的队列

5.Topic主题模式

Topic主题模式采用的是Topic类型的交换机,因此是支持模糊匹配,消息能被投递到一个或多个队列中。生产者发送消息时指定routing key,Topic类型的交换机会根据routing key找到所有符合队列与交换机绑定时指定的binding key规则的队列,并将消息投递到那些队列中。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
3月前
|
设计模式 运维 安全
边车模式的介绍
边车模式的介绍
35 0
|
3月前
一般模式
【2月更文挑战第20天】一般模式。
22 1
|
3月前
|
设计模式 算法 编译器
【C/C++ PIMPL模式 】 深入探索C++中的PIMPL模式
【C/C++ PIMPL模式 】 深入探索C++中的PIMPL模式
143 0
|
分布式计算 自然语言处理 并行计算
运用Aggregator模式实现MapReduc
运用Aggregator模式实现MapReduc
运用Aggregator模式实现MapReduc
|
前端开发 JavaScript Java
MVX模式是什么?
MVX模式是什么?
253 0
使用不完整的模式
使用不完整的模式
69 0
|
C语言
模式
模式
126 0
|
并行计算 搜索推荐 算法
|
前端开发 JavaScript 编译器
模式二之框架模式
模式二之框架模式