【Spring Boot实战与进阶】集成RabbitMQ的实例详解

简介: RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
Spring Boot是很优秀的框架,它的出现简化了新Spring应用的初始搭建以及开发过程,大大减少了代码量,目前已被大多数企业认可和使用。这个专栏将对Spring Boot框架从浅入深,从实战到进阶,不但我们要懂得如何去使用,还要去剖析框架源码,学习其优秀的设计思想。

汇总目录链接:【Spring Boot实战与进阶】学习目录

一、简介

   RabbitMQ是采用 Erlang语言实现AMQP协议的消息中间件,AMQP全称是 Advanced Message Queue Protocolo,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
在这里插入图片描述
常用的交换机有以下三种:

1、Direct Exchange(直连型交换机)

根据消息携带的路由键(routing key)将消息投递给对应队列,direct exchange 适用于消息的单播发送。
工作流程如下:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定 一个 route key。
  • 当一个携带 route key为R 的消息被发送到 direct exchange 时,exchange 会将消息路由到 绑定值同样为 R 的队列。注意Route Key和绑定值要完全匹配才行

direct exchange 经常用于在 多个 worker 中分配任务,当这样做时,需注意,在AMQP 0-9-1中,消息的负载均衡发生在 consumer之间,而不是在 queue之间。
在这里插入图片描述

2、Fanout Exchange(扇型交换机)
  一个 fanout exchange 会将消息分发给所有绑定到此 exchange 的queue中,不管 queue中的 route key。如果有 N 个 Queue 绑定到 一个 fanout exchange 时,那么此时 exchange 收到消息时,会将此消息分发到 这 N 个 queue中,由于此性质, fanout exchange 也常用消息的广播。
在这里插入图片描述

3、Topic Exchange(主题交换机)

  topic exchange 会根据 route key 将消息分发到与此消息的 route key 相匹配的并且绑定此exchange的一个或多个 queue。这里的相匹配与 direct exchange的完全匹配的路由规则不一样,topic exchange 在匹配规则上进行了扩展,规则如下:

  • RoutingKey(路由键)为一个点号 "." 分隔的字符串,如 "com.rabbitmq.client"、"java.util.concurrent"、"com.hidden.client"等
  • BindingKey(绑定键) 和 RoutingKey一样也是点号 "." 分隔的字符串
  • BindingKey (绑定键) 中可以存在两种 特殊字符串 "" 和 "#" ,用于做模糊匹配,其中 " # " 用于匹配一个单词," "用于匹配多个单词

  topic exchange 经常用于实现 publish/subscribe模型,即消息的多播模型。这里的Topic Exchange就适用于发布/订阅模型。RabbitMQ的一个原则就是,消息不能直接投递到 Queue中,必须先将消息投递到 Exchange中,然后由Exchange 按照路由规则将消息投递到对应的 Queue中。
在这里插入图片描述

二、集成RabbitMQ的简单例子

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

3、创建RabbitMQ配置类和消息队列

@Configuration
public class RabbitConfig {

    @Bean
    public Queue getQueue() {
        return new Queue("QA");
    }
}

4、创建生产者

@Component
public class MsgProducer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        rabbitTemplate.convertAndSend("QA", "这是一条最新消息");
    }
}

5、创建消费者

@Component
@RabbitListener(queues = "QA")
public class MsgConsumer1 {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: " + msg);
    }

}

6、创建测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Resource
    private MsgProducer msgProducer;

    // 发送单条消息
    @Test
    public void contextLoads() {
        msgProducer.send();
    }
}

7、运行项目

控制台输出:

【消费者接收的消息】: 这是一条最新消息

RabbitMQ后台管理端:
在这里插入图片描述

三、项目实战

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置RabbitMQ连接信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest

mq:
  email:
    queue-name: email-queue
    exchange-name: email-exchange
    routing-key-name: email-routing-key

3、创建RabbitMQ配置类和消息队列

@Configuration
@Slf4j
public class RabbitMQConfig {
    @Resource
    private CachingConnectionFactory connectionFactory;

    @Value("${mq.email.queue-name}")
    private String queueName;
    @Value("${mq.email.exchange-name}")
    private String exchangeName;
    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    /**
     * 单一消费者
     *
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功: correlationData({}), ack({}), cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失: exchange({}), route({}), replyCode({}), replyText({}), message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(queueName, true);
    }

    @Bean
    public DirectExchange emailExchange() {
        return new DirectExchange(exchangeName, true, false);
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(routingKeyName);
    }
}

4、MsgConsumerListener(消费者)

@Component
public class MsgConsumerListener {
    @RabbitListener(queues = "${mq.email.queue-name}", containerFactory = "singleListenerContainer")
    public void process(String msg) {
        System.out.println("【消费者接收的消息】: " + msg);
    }
}

5、TestController(消息生产和推送的接口)

@RestController
public class TestController {

    @Value("${mq.email.exchange-name}")
    private String exchangeName;

    @Value("${mq.email.routing-key-name}")
    private String routingKeyName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/test")
    public void test() {
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setRoutingKey(routingKeyName);
        rabbitTemplate.convertAndSend("我是队列里的一条消息");
    }
}

6、调用接口推送消息

调用接口http://localhost:8080/test

控制台输出:

消息发送成功: correlationData(null), ack(true), cause(null)
【消费者接收的消息】: 我是队列里的一条消息
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
154 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
4天前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
1月前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
35 6
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
70 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
54 1
|
2月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
50 2
|
3月前
|
SQL JSON Java
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和PageHelper进行分页操作,并且集成Swagger2来生成API文档,同时定义了统一的数据返回格式和请求模块。
91 1
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
|
3月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
280 1