SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)

简介: SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!


Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: snow.com


通配符规则:


#:匹配一个或多个词


*:匹配不多不少恰好1个词


举例:


snow.#:能够匹配snow.com.cn 或者 snow.com


snow.*:只能匹配snow.com


如下图:

解释:


  • Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news


案例需求:

实现思路如下:


  1. 并利用 @RabbitListener 声明 Exchange、Queue、RoutingKey
  2. 在 consumer 服务中,编写两个消费者方法,分别监听 business.test.topic.queue1 和business.test.topic.queue2
  3. 在 publisher 中编写测试方法,向business.test.topic发送消息


pom


    <dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

yml


server:
  port: 8080
spring:
  rabbitmq:
    host: **.***.**.***
    port: 5672
    username: ****
    password: ****


生产者


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: Snow
 * @date: 2023/1/6
 * **************************************************
 * 修改记录(时间--修改人--修改说明):
 */
@RestController
@RequestMapping("/topic")
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{message}")
    public void send(@PathVariable("message") String message){
        // 交换机名称
        String exchangeName = "business.test.topic";
        // 发送消息
        if(message.contains("china") && message.contains("news")){
            rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
            return;
        }
        if(message.contains("china")){
            rabbitTemplate.convertAndSend(exchangeName, "china.lala", message);
            return;
        }
        if(message.contains("news")){
            rabbitTemplate.convertAndSend(exchangeName, "lalla.news", message);
            return;
        }

    }


}


消费者


import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author: Snow
 * @date: 2023/1/6
 * **************************************************
 * 修改记录(时间--修改人--修改说明):
 */
@Component
public class Consumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "business.test.topic.queue1"),
            exchange = @Exchange(name = "business.test.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "business.test.topic.queue2"),
            exchange = @Exchange(name = "business.test.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 Java
【RabbitMQ】-SpringAMQP以及Work模型
RabbitMQ的工作模型通过消息队列和消费者的并行处理,极大地提高了任务处理的效率。通过Spring AMQP可以方便地与RabbitMQ进行交互,实现高效的消息传递和任务处理。本文详细介绍了如何配置和使用Spring AMQP来实现RabbitMQ的工作模型,包括生产者、消费者的定义以及消息的发送和接收过程。
79 14
|
9月前
|
消息中间件 Java 网络架构
|
5月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
84 6
|
8月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
588 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
8月前
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
7月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
83 0
rabbitmq基础教程(ui,java,springamqp)
|
9月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
9月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1461 3
|
9月前
|
消息中间件 Java Maven
|
7月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
588 36