RabbitMQ消息模型之Routing-Topic

简介: RabbitMQ消息模型之Routing-Topic

Routing Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insertitem.#item.*

统配符

* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
# (hash) can substitute for zero or more words.  匹配[0-n]个词
a.#   可以匹配 a.b、a.b.c、a.b.c.d 等只要是a.开头的情况
a.*   只能匹配 a.b 这种后面只有一个单词的情况

通配符可以出现在

注意:RoutingKey的任意位置。

创建生产者

public class MyProducer {
    @Test
    public void test() throws Exception {
        // 交换机
        String exchange = "logs_topic";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "topic");
            // 发布消息
            channel.basicPublish(exchange, "a.b", null, "a.b".getBytes());
            channel.basicPublish(exchange, "a.b.c", null, "a.b.c".getBytes());
            channel.basicPublish(exchange, "a.b.c.d", null, "a.b.c.d" .getBytes());
            channel.basicPublish(exchange, "a.b.c.d.e", null, "a.b.c.d.e".getBytes());
    }
}

创建消费者1

public class MyConsumer1 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_topic";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "a.*");
        channel.queueBind(queue, exchange, "#.d.#");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {
    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_topic";
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "#.b.#");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

生产者生产的消息:a.ba.b.ca.b.c.da.b.c.d.e

消费者1接受的消息规则为:

channel.queueBind(queue, exchange, "a.*");
channel.queueBind(queue, exchange, "#.d.#");

所以消费者1将会接收到:a.ba.b.c.da.b.c.d.e

消费者2接受的消息规则为:

channel.queueBind(queue, exchange, "#.b.#");

所以消费者2将会接收到:a.ba.b.ca.b.c.da.b.c.d.e


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
27 1
|
2月前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
29 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
12月前
|
消息中间件 存储 Java
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)
237 1
|
2月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
9月前
|
消息中间件 存储 Java
RabbitMQ之topic(主题)Exchange解读
RabbitMQ之topic(主题)Exchange解读
|
12月前
|
消息中间件 应用服务中间件 nginx
【RabbitMQ六】——RabbitMQ主题模式(Topic)
【RabbitMQ六】——RabbitMQ主题模式(Topic)
218 1
|
12月前
|
消息中间件 存储
【RabbitMQ五】——RabbitMQ路由模式(Routing)
【RabbitMQ五】——RabbitMQ路由模式(Routing)
177 0
|
消息中间件
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
|
消息中间件 存储 JSON
【RabbitMQ】Fanout、Direct、Topic、消息转换器
【RabbitMQ】Fanout、Direct、Topic、消息转换器
241 0
【RabbitMQ】Fanout、Direct、Topic、消息转换器
|
消息中间件 存储
【RabbitMQ】——三种Exchange模式(Fanout、Direct、Topic)
【RabbitMQ】——三种Exchange模式(Fanout、Direct、Topic)
265 0
【RabbitMQ】——三种Exchange模式(Fanout、Direct、Topic)