RabbitMQ消息模型之Routing-Direct

简介: RabbitMQ消息模型之Routing-Direct

Routing Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列。
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。

创建生产者

public class MyProducer {
    @Test
    public void test() throws Exception {
        // 交换机
        String exchange = "logs_direct";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "direct");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, "DEBUG", null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "INFO", null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "WARN", null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "ERROR", null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "WARN");
        channel.queueBind(queue, exchange, "ERROR");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "DEBUG");
        channel.queueBind(queue, exchange, "INFO");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件
RabbitMQ消息模型之Work Queues
RabbitMQ消息模型之Work Queues
51 1
RabbitMQ消息模型之Work Queues
|
7月前
|
消息中间件
RabbitMQ消息模型之Routing-Topic
RabbitMQ消息模型之Routing-Topic
46 0
|
7月前
|
消息中间件 缓存
RabbitMQ消息模型之Sample
RabbitMQ消息模型之Sample
40 0
|
7月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
87 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
7月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
172 0
|
4月前
|
消息中间件 测试技术 Kafka
Apache RocketMQ 批处理模型演进之路
RocketMQ 早期批处理模型存在一定的约束条件,为进一步提升性能,RocketMQ 进行了索引构建流水线改造,同时 BatchCQ 模型和 AutoBatch 模型也优化了批处理流程,提供了更简便的使用体验,快点击本文查看详情及配置展示~
19772 79
|
3月前
|
消息中间件 存储 缓存
RabbitMQ:WorkQueues模型
RabbitMQ:WorkQueues模型
47 8
RabbitMQ:WorkQueues模型
|
7月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
7月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
7月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
197 0