【Spring Boot实战与进阶】集成RabbitMQ的实例详解

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用。这个专栏将对Spring Boot框架从浅入深,从实战到进阶,不但我们要懂得如何去使用,还要去剖析框架源码,学习其优秀的设计思想。

汇总目录链接:【Spring Boot实战与进阶】学习目录

一、简介

   RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
在这里插入图片描述
常用的交换机有以下三种:

1、Direct Exchange(直连型交换机)

根据消息携带的路由键(routing key)将消息投递给对应队列,direct exchange 适用于消息的单播发送。
工作流程如下:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定 一个 route key。
  • 当一个携带 route key为R 的消息被发送到 direct exchange 时,exchange 会将消息路由到 绑定值同样为 R 的队列。注意Route Key和绑定值要完全匹配才行

direct exchange 经常用于在 多个 worker 中分配任务,当这样做时,需注意,在AMQP 0-9-1中,消息的负载均衡发生在 consumer之间,而不是在 queue之间。
在这里插入图片描述

2、Fanout Exchange(扇型交换机)
  一个 fanout exchange 会将消息分发给所有绑定到此 exchange 的queue中,不管 queue中的 route key。如果有 N 个 Queue 绑定到 一个 fanout exchange 时,那么此时 exchange 收到消息时,会将此消息分发到 这 N 个 queue中,由于此性质, fanout exchange 也常用消息的广播。
在这里插入图片描述

3、Topic Exchange(主题交换机)

  topic exchange 会根据 route key 将消息分发到与此消息的 route key 相匹配的并且绑定此exchange的一个或多个 queue。这里的相匹配与 direct exchange的完全匹配的路由规则不一样,topic exchange 在匹配规则上进行了扩展,规则如下:

  • RoutingKey(路由键)为一个点号 "." 分隔的字符串,如 "com.rabbitmq.client"、"java.util.concurrent"、"com.hidden.client"等
  • BindingKey(绑定键) 和 RoutingKey一样也是点号 "." 分隔的字符串
  • BindingKey (绑定键) 中可以存在两种 特殊字符串 "" 和 "#" ,用于做模糊匹配,其中 " # " 用于匹配一个单词," "用于匹配多个单词

  topic exchange 经常用于实现 publish/subscribe模型,即消息的多播模型。这里的Topic Exchange就适用于发布/订阅模型。RabbitMQ的一个原则就是,消息不能直接投递到 Queue中,必须先将消息投递到 Exchange中,然后由Exchange 按照路由规则将消息投递到对应的 Queue中。
在这里插入图片描述

二、集成RabbitMQ的简单例子

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

3、创建RabbitMQ配置类和消息队列

@Configuration
public class RabbitConfig {

    @Bean
    public Queue getQueue() {
        return new Queue("QA");
    }
}

4、创建生产者

@Component
public class MsgProducer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("QA", "这是一条最新消息");
    }
}

5、创建消费者

@Component
@RabbitListener(queues = "QA")
public class MsgConsumer1 {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: " + msg);
    }

}

6、创建测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Resource
    private MsgProducer msgProducer;

    // 发送单条消息
    @Test
    public void contextLoads() {
        msgProducer.send();
    }
}

7、运行项目

控制台输出:

【消费者接收的消息】: 这是一条最新消息

RabbitMQ后台管理端:
在这里插入图片描述

三、项目实战

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

mq:
  email:
    queue-name: email-queue
    exchange-name: email-exchange
    routing-key-name: email-routing-key

3、创建RabbitMQ配置类和消息队列

@Configuration
@Slf4j
public class RabbitMQConfig {
    @Resource
    private CachingConnectionFactory connectionFactory;

    @Value("${mq.email.queue-name}")
    private String queueName;
    @Value("${mq.email.exchange-name}")
    private String exchangeName;
    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    /**
     * 单一消费者
     *
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功: correlationData({}), ack({}), cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失: exchange({}), route({}), replyCode({}), replyText({}), message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(queueName, true);
    }

    @Bean
    public DirectExchange emailExchange() {
        return new DirectExchange(exchangeName, true, false);
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(routingKeyName);
    }
}

4、MsgConsumerListener(消费者)

@Component
public class MsgConsumerListener {
    @RabbitListener(queues = "${mq.email.queue-name}", containerFactory = "singleListenerContainer")
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: " + msg);
    }
}

5、TestController(消息生产和推送的接口)

@RestController
public class TestController {

    @Value("${mq.email.exchange-name}")
    private String exchangeName;

    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/test")
    public void test() {
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setRoutingKey(routingKeyName);
        rabbitTemplate.convertAndSend("我是队列里的一条消息");
    }
}

6、调用接口推送消息

调用接口http://localhost:8080/test

控制台输出:

消息发送成功: correlationData(null), ack(true), cause(null)
【消费者接收的消息】: 我是队列里的一条消息
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
65 3
|
3月前
|
消息中间件 Java 网络架构
|
30天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
1月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
169 0
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
36 1
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
779 2
|
3月前
|
消息中间件 Java Maven
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
66 0