9、RabbitMQ教程-Topic Exchange类型的基本使用demo

简介: 9、RabbitMQ教程-Topic Exchange类型的基本使用demo
上两篇文章中,我们使用Direct Exchange和Fanout Exchange类型推送了消息,但是我们发现这两种类型在某些情况下并不适合,比如:我们MQ中有一百个不同队列订阅了我们的某个生产者,但是在某一次的时候,我们生产者只希望其中一部分对了能接受消息,那么我们的Fanout Exchange直接广播就会造成信息资源浪费,如果我们选择Direct Exchange的时候,那我们要发送给对应的几十个队列就会要不断设置,重复工作很多,也很容易出错。但是要是能够根据我们的队列RoutingKey来模糊匹配是不是很我们的问题就完美解决了啊。本文就是讲解这种模式

使用Topic Exchange根据规则推送一条消息

  • 准备工作

既然我们要根据规则推送给某一批队列,我们要先建立一批队列,指定路由器类型为Topic Exchange,然后我们使用规则定义routing key。我们这里准备了三个队列

在这里插入图片描述

并通过routing key进行绑定到topic类型的Exchange上

在这里插入图片描述

routing key作用解析

routing key的命名是我们Topic Exchange类型的最关键部分,在推送消息的时候,如果是使用topic类型的Exchange可以直接不指定队列,所以这个时候routing key就成为了我们推送消息的关键。为了消息能够更好的推送接受,在这个地方我们肯定要规范的定义routing kye的名称。在我们这里,我们看了总共有三个routing key

  • com.echo.* :当推送消息时routing key为com.echo.任意字符的时候,队列queue_topic1对应的队列一定能接受到消息
  • com.echo.level2 :当推送消息时routing key为com.echo.level2,队列queue_topic2对应的队列才会接受到消息
  • # :当推送消息时routing key为任意值,队列queue_topic2对应的队列都能接受到消息

其中 * 匹配任意一个单词,# 匹配零个或者多个单词。

代码案例

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicProductTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String ROUTING_KEY = "com.echo.level2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;


    public static void main(String[] args) throws IOException, TimeoutException {

        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一个链接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 创建一个 type="direct" 、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        // 发送一条持久化的消息: topic hello world !
        String message = "topic hello world !";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // 关闭资源
        channel.close();
        connection.close();

    }

}

按照上面routing key的作用,我们这个实例推送的消息,对应的三个队列应该都会受到消息,我们运行了看看结果

在这里插入图片描述

最终结果和我们的作用域的描述符合
  • 我们还可以使用消费者来验证一下

消费者代码

package com.example.demo;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author tang.sl
 * @date 2021-01-14 15:05
 */
public class TopicConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // 和RabbitMQ建立一个链接
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //声明交换机 Fanout模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        //进行绑定,指定消费那个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
        Consumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回调函数执行完毕之后 关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

运行之后结果一致

在这里插入图片描述

总结

topic能够对指定的一系列或者说一堆的队列发送消息,关键就是靠routing key的*和#这两个通配符的匹配作用

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 存储 数据安全/隐私保护
RabbitMQ使用教程
RabbitMQ使用教程
11 2
|
2月前
|
消息中间件 Linux 开发工具
Linux系统安装RabbitMQ详细教程
Linux系统安装RabbitMQ详细教程
28 0
|
2月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
4月前
|
消息中间件 Java
RabbitMQ中的Exchange是什么?它有哪些类型?
RabbitMQ中的Exchange是什么?它有哪些类型?
34 0
|
5月前
|
消息中间件 数据安全/隐私保护 Docker
百度搜索:蓝易云【Docker安装RabbitMQ docker安装RabbitMQ完整详细教程】
通过按照以上步骤,你应该能够成功在Docker上安装并运行RabbitMQ。请记住,具体步骤可能会因Docker版本和操作系统而有所不同。如果遇到任何问题,可以查阅官方文档或社区寻求更多帮助。
122 0
|
5月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
5月前
|
消息中间件 JSON 运维
spring boot RabbitMq基础教程(三)
spring boot RabbitMq基础教程
75 1
|
5月前
|
消息中间件 存储 运维
spring boot RabbitMq基础教程(二)
spring boot RabbitMq基础教程
44 0
|
5月前
|
消息中间件 存储 Java
spring boot RabbitMq基础教程(一)
spring boot RabbitMq基础教程
42 0
|
5月前
|
消息中间件 存储 NoSQL
【RabbitMQ教程】第八章 —— RabbitMQ - 幂等性、优先级、惰性
【RabbitMQ教程】第八章 —— RabbitMQ - 幂等性、优先级、惰性