Springboot整合Rabbitmq,Direct、Fanout、Topic

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: Springboot整合Rabbitmq,Direct、Fanout、Topic

安装


https://www.rabbitmq.com/install-windows.html

注意安装Erlang,没有安装的话安装rabbitmq也会提示你跳转安装Erlang

1673405351833.jpg

测试版本:RabbitMQ 3.10.4、Erlang 25.0、springboot2.5.9


搜索RabbitMQ Service - start 启动,


开启可视化管理插件,找到 RabbitMQ 的安装目录,切换到 sbin 文件夹下,打开命令行,输入:

rabbitmq-plugins enable rabbitmq_management

访问http://127.0.0.1:15672,默认的账号和密码都是 guest


介绍:

1673405385822.jpg

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:


Direct Exchange


直连型交换机,根据消息携带的路由键将消息投递给对应队列。


大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。

然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列, routing key必须相等。


直连交换机是一对一,如果配置多台监听绑定到同一个直连交换的同一个队列会轮询的方式对消息进行消费,而且不存在重复消费。


Fanout Exchange


扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的 所有队列。


Topic Exchange


主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。


topic 的模式匹配包括两个通配符:#和* ,其中 # 匹配 0 或多个单词, * 匹配一个单词 (必须出现的)。


代码测试


pom

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


yml

server:
  port: 8093
spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
    port: 5672
    virtual-host: /


Direct Exchange


创建demo 队列,也可通过管理面板创建,也可以@Bean 来 declare:

import org.springframework.amqp.core.Queue;
@Configuration
public class RabbitConfig {
    @Bean
    public Queue demo() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue("demo", true);
    }
}


Producer

import org.springframework.amqp.rabbit.core.RabbitTemplate;
@RestController
@RequestMapping(value = "/msg")
public class HelloAction {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping(value = "/direct")
    public String sayHello(String msg) {
        rabbitTemplate.convertAndSend("demo", msg);
        return "消息发送成功";
    }
}

1673405503568.jpg1673405503568.jpg


Consumer


利用 @RabbitListener 注解接收数据:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
@Component
public class RabbitConsumer {
    @RabbitListener(queues = {"demo"})
    public void consume(Message message, Channel channel) throws IOException {
        System.out.println("接收到消息:" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}


上边用的是 Message 类型,我们可以通过 @Payloa 注解来直接接收 body 数据:

@RabbitListener(queues = {"demo"})
    public void consume(@Payload String body, Channel channel) throws IOException {
        System.out.println("接收到消息:" + body);
    }

1673405543768.jpg

Spring Boot RabbitMQ 中可以使用 @RabbitListener 和 @RabbitHandler 两个注解联合来从同样的接口中接收不同参数类型的数据并处理,比如 String、Object 等。


不过个人建议发送数据的时候直接用 String,如果是对象,可以用 JSON 工具将对象转换为字符串。接收数据时再用 JSON 工具将字符串还原为对象。


在这种情况下,我们使用的默认的 direct exchange,默认的 routing key 就是队列名。

如果需要自定义 exchange,那么就要指定 binding。比如新建一个 direct 类型的 exchange,名为 demoex,然后添加一个 binding 到 demo,这个routing key 是完全匹配 的。


Fanout Exchange


新建exchange:demoex_fanout,三个Queue:pub_demo1, pub_demo2, pub_demo3将这三个队列绑定到这个exchange上,不需要 routing_key。


发送广播消息:

@GetMapping(value = "/broadCast")
    public String broadCast(String msg) {
        // 广播消息到 demoex_fanout 这个 exchange 绑定的所有队列
        // 创建fanout类型exchange:demoex_fanout, 新建2个或以上队列绑定到交换机
        rabbitTemplate.convertAndSend("demoex_fanout", "", msg);
        return "广播成功";
    }

1673405581841.jpg


Topic Exchange


新建Topic exchange,名为 demoex_topic,新建三个队列通过Routing Key绑定到交换机上


pat_demo1: demo.*

pat_demo2: demo.a.#

pat_demo3: demo.a.*.c

1673405594591.jpg

@GetMapping(value = "/topic")
    public String pattern(String msg) {
        // 匹配模式demoex_topic
        // 新建三个队列,Routing Key为:demo.#,demo.a.*,demo.a.*.c
        rabbitTemplate.convertAndSend("demoex_topic", "demo.a.b.c", msg);
        return "模式传播成功";
    }


1673405615853.jpg

1673405615853.jpg

只有pat_demo2,pat_demo3通过Routing Key匹配接收到消息


消息回调


生产者消息确认


回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)

server:
  port: 8093
spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
    port: 5672
    virtual-host: /
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true


RabbitConfig.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
            }
        });
        return rabbitTemplate;
    }
//    /**
//     * 创建队列 demo
//     *
//     * @author yh
//     * @date 2022/6/4
//     */
//    @Bean
//    public Queue demo() {
//        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
//        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
//        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//        return new Queue("demo", true);
//    }
//    /**
//     * Direct交换机 起名:TestDirectExchange
//     *
//     * @author yh
//     * @date 2022/6/4
//     */
//    @Bean
//    DirectExchange TestDirectExchange() {
//        return new DirectExchange("TestDirectExchange", true, false);
//    }
//
//    /**
//     * 绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
//     *
//     * @author yh
//     * @date 2022/6/4
//     */
//    @Bean
//    Binding bindingDirect() {
//        return BindingBuilder.bind(demo()).to(TestDirectExchange()).with("TestDirectRouting");
//    }
//
//
//    @Bean
//    DirectExchange lonelyDirectExchange() {
//        return new DirectExchange("lonelyDirectExchange");
//    }
}


上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;


先从总体的情况分析,推送消息存在四种情况:


消息推送到server,但是在server里找不到交换机。

2022-06-04 22:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)
ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:false
ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)


结论: ①这种情况触发的是 ConfirmCallback 回调函数。


消息推送到server,找到交换机了,但是没找到队列

ReturnCallback:     消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回应码:312
ReturnCallback:     回应信息:NO_ROUTE
ReturnCallback:     交换机:lonelyDirectExchange
ReturnCallback:     路由键:TestDirectRouting
ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:true
ConfirmCallback:     原因:null


这种情况,两个函数都被调用了;

这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true;

而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。


结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。


消息推送到sever,交换机和队列啥都没找到

3和1看似类似,实际情况回调是一致的

这种情况触发的是 ConfirmCallback 回调函数


消息推送成功

ConfirmCallback:     相关数据:null
ConfirmCallback:     确认情况:true
ConfirmCallback:     原因:null


结论: ④这种情况触发的是 ConfirmCallback 回调函数。


消费者消息确认机制


三种模式


自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE


RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。


所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。


根据情况确认, 这个不做介绍


手动确认, 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。


basic.ack用于肯定确认

basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)

basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息


消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。

而basic.nack,basic.reject表示没有被正确处理:


着重讲下 reject,因为有时候一些场景是需要重新入列的。

channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。


使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。


但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。


最后简单讲讲 nack,这个也是相当于设置不消费某条消息。

channel.basicNack(deliveryTag, false, true);

第一个参数依然是当前消息到的数据的唯一id;

第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。

第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。


同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java 网络架构
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
224 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
1月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
93 2
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
70 2
|
4月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
4月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
5月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
225 0