SpringBoot整合RabbitMQ

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: SpringBoot整合RabbitMQ

RabbitMQ部署指北

下载镜像

docker pull rabbitmq:3.8-management

执行下面的命令来运行MQ容器:

docker run \

-e RABBITMQ_DEFAULT_USER=itcast \

-e RABBITMQ_DEFAULT_PASS=zhangbo123456* \

-v mq-plugins:/plugins \

--name mq \

--hostname mq1 \

-p 15672:15672 \

-p 5672:5672 \

-d \

rabbitmq:3.8-management

什么是消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

RabbitMQ快速入门

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com

SpringAMQP

1,Basic Queue 简单队列模型

2,Work Queue 工作队列模型

3,发布订阅模型 fanout

4,发布订阅模型 Direct

5,发布订阅模型 Topic

6,消息转换器

概念:

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

 

AMQP和JMS区别和联系

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

HelloWorld案例

官方的helloword是基于最基础的消息队列模型来实现的,其中包括三个角色

1,publisher:消息发布者,要将消息发布到队列queue

2,queue:消息队列,负责接收并缓存消息

3,consumer:订阅队列,处理队列中的消息

基本消息队列的消息发送流程

1,建立connection

2,创建channel

3,利用channel声名队列

4,利用channel向队列发送消息

基本消息队列的消息接收流程

1,建立connection

2,创建channel

3,利用channel声名队列

4,定义consumer的消费行为handleDelivery

5,利用channel将消费者与队列绑定

快速开始

第一步导入依赖

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

第二步编写配置文件

1. spring:
2.   rabbitmq:
3.     host: 47.99.139.160  #主机
4.     port: 5672   #端口号
5.     virtual-host: /    #虚拟主机
6.     username: itcast   #用户名
7.     password: zhangbo123456*   #密码

第三步编写测试方法

1. @Autowired
2. RabbitTemplate rabbitTemplate;
3. 
4. @Test
5. void contextLoads() {
6. String queueName = "simple.queue";
7. String message = "hello , spring amqp";
8.     rabbitTemplate.convertAndSend(queueName,message);
9. }

小注:这个消息不会 创建队列,所以要手动创建队列

第四步在Consumer中编写消费逻辑,监听队列

1. @Component
2. public class SpringRabbitListener {
3. 
4. @RabbitListener(queues = "simple.queue")
5. public void listenSimplateQueueMessage(String msg) throws InterruptedException{
6.         System.out.println("spring消费者接收到消息:"+msg);
7.     }
8. }

 

消息预取限制

修改application.yml,设置preFetch这个值,可以控制预取消息的上线

spring:

 rabbitmq:

   host: 47.99.139.160  #主机

   port: 5672   #端口号

   virtual-host: /    #虚拟主机

   username: itcast   #用户名

   password: zhangbo123456*   #密码

   listener:

     simple:

       prefetch: 1  #每次只能获取一条消息,处理完成才能获取下一条消息

发布 订阅

发布订阅模式允许将同一消息发送个多个消费者,实现方式是加入了exchange

常见exchange类型包括

  • Fanout:广播
  • Direct:路由
  • Topic:话题

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(可以用于实现广播模式)

实现思路:

1,在consumer服务中,利用代码声明队列,交换机,并将两者绑定

2,在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

3,在publisher中编写测试方法,向itcast.fanout发送消息

步骤一 :在consumer服务声名exchange,queue,binding,在consumer服务声名一个配置类,添加@Configuration注解,并声明FanoutExchange,queue和绑定关系对象binding

1. @Configuration
2. public class FanoutConfig {
3. 
4. //声名FanoutChange交换机
5. @Bean
6. public FanoutExchange fanoutExchange(){
7. return new FanoutExchange("itcast.fanout");
8.     }
9. 
10. //声名第一个队列
11. @Bean
12. public Queue fanoutQueue1(){
13. return new Queue("fanout.queue1");
14.     }
15. 
16. //绑定队列一和交换机
17. @Bean
18. public Binding bindingQueue1(Queue fanoutQueue1 , FanoutExchange fanoutExchange){
19. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
20.     }
21. //...略,以相同的方式声名第二个队列,并完成绑定
22. }

consumer代码

1. //fanout 模式
2. @RabbitListener(queues = "fanout.queue1")
3. public void listenFanoutQueueMessage(String msg) throws InterruptedException{
4.         System.out.println("spring消费者接收到fanout.queue1消息:"+msg);
5.     }
6. 
7. //fanout 模式
8. @RabbitListener(queues = "fanout.queue2")
9. public void listenFanoutQueueMessage2(String msg) throws InterruptedException{
10.         System.out.println("spring消费者接收到fanout.queue2消息:"+msg);
11.     }

publisher代码

1. //fanout 模式
2. @Test
3. public void testSendFanoutExchange(){
4. //交换机名称
5. String exchangeName = "itcast.fanout";
6. //消息
7. String message = "hello , every one";
8. //发送消息
9.         rabbitTemplate.convertAndSend(exchangeName,"",message);
10.     }

总结:

交换机的作用?

1,接收publisher发送的消息

2,将消息按照路由规则路由到与之绑定的队列

3,不能缓存消息,路由失败,消息丢失

4,FanoutExchange的会将消息路由到每个绑定的队列

声名队列,交换机,绑定关系的bean是什么?

  • queue
  • fanoutExchange
  • Binding

 

发布订阅-DirectExchange

Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此称之为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例实现思路

1,利用@RabbitListener声名Exchange,Queue,RoutingKey

2,zaiconsumer服务中,编写两个消费者方法,分别监听direct.queue和direct.queue2

3,在publisher中编写测试方法,向itcast.direct发送消息

consumer

1. //direct模式
2. @RabbitListener(bindings = @QueueBinding(
3.             value = @Queue(name = "direct.queue1"),
4.             exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
5.             key = {"red","blue"}
6.     ))
7. public void listenDirectQueue(String msg){
8.         System.out.println("spring消费者接收到direct.queue1消息:"+msg);
9.     }
10. 
11. //direct模式
12. @RabbitListener(bindings = @QueueBinding(
13.             value = @Queue(name = "direct.queue2"),
14.             exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
15.             key = {"red","yellow"}
16.     ))
17. public void listenDirectQueue2(String msg){
18.         System.out.println("spring消费者接收到direct.queue2消息:"+msg);
19.     }

publisher

1. //direct 模式
2. @Test
3. public void testSendDirectExchange(){
4. //交换机名称
5. String exchangeName = "itcast.direct";
6. //消息
7. String message = "hello , smoky";
8. //发送消息  参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息
9.         rabbitTemplate.convertAndSend(exchangeName,"smoky",message);
10.     }

总结:

描述direct交换机和fanout交换机的差异?

fanout交换机将消息发送给每一个与之绑定的队列

directii交换机根据RoutingKey判断路由给那个队列

如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声名队列和交换机有哪些常见注解?

@Queue

@Exchange

 

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

Queue与Exchange指定BIndingKey时可以指定通配符

#:代指0个或多个单词

*:代指一个单词

案例实现思路

1,利用@RabbitListener声名Exchange Queue RoutingKey

2,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

3,在publisher中编写测试方法,向itcast.topic发送消息

consumer

1. //topic模式
2. @RabbitListener(bindings = @QueueBinding(
3.             value = @Queue("topic.queue1"),
4.             exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
5.             key = "chain.#"
6.     ))
7. public void listenTopictQueue1(String msg){
8.         System.out.println("spring消费者接收到topic.queue1消息:"+msg);
9.     }
10. 
11. //topic模式
12. @RabbitListener(bindings = @QueueBinding(
13.             value = @Queue("topic.queue2"),
14.             exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
15.             key = "*.news"
16.     ))
17. public void listenTopictQueue2(String msg){
18.         System.out.println("spring消费者接收到topic.queue2消息:"+msg);
19.     }

publisher

1. //direct 模式
2. @Test
3. public void testSendTopictExchange(){
4. //交换机名称
5. String exchangeName = "itcast.topic";
6. //消息
7. String message = "今天天气很好呀";
8. //发送消息  参数分别是:交换机名称 RoutingKey(暂时为空,路由key),消息
9.         rabbitTemplate.convertAndSend(exchangeName,"chain.weather",message);
10.     }

测试发送Object类型消息,消息转换器

说明:在SpringAMQP的发送方法中,接收到的消息类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,用的jdk的序列化器

补充: 使用jdk的序列化器的缺点:1,性能比较差 2,安全性不好,容易出现注入的问题 3,数据长度长,占用额外内存

测试代码

1. //测试Object类型消息
2. @Test
3. public void sendObjectQueue(){
4.         Map<String,Object> msg = new HashMap<>();
5.         msg.put("name","柳岩");
6.         msg.put("age",21);
7.         rabbitTemplate.convertAndSend("object.queue",msg);
8.     }

Spring的对消息对象的处理是由import org.springframework.messaging.converter.MessageConverter;来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需要定义一个MessageConverter类型的bean即可,推荐使用JSON的方式序列化

引入依赖

       <dependency>

           <groupId>com.fasterxml.jackson.core</groupId>

           <artifactId>jackson-databind</artifactId>

       </dependency>

声名一个MessageConverter类型的bean  

1. @Bean
2. public MessageConverter jsonMessageConverter(){
3. return new Jackson2JsonMessageConverter();
4.     }

consumer

引入依赖

<dependency>

           <groupId>com.fasterxml.jackson.core</groupId>

           <artifactId>jackson-databind</artifactId>

       </dependency>

consumer服务定义MessageConverter

1. @Bean
2. public MessageConverter jsonMessageConverter(){
3. return new Jackson2JsonMessageConverter();
4.     }

SpringAMQP中消息的序列化和反序列化是怎么实现的

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方接收必须使用相同的MessageConverter

MQ的一些常见问题

1,消息可靠性:如何确保发送的消息至少被消费一次

2,延迟消息问题:如何实现消息的延迟投递

3,高可用问题:如何避免单点的MQ故障而导致的不可用问题

4,消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

 

消息可靠性问题

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • 发送时丢失,
  • 生产者发送的消息未到达exchange
  • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

生产者确认机制

RabbitMq提供了publisher confirm机制避免消息发送到MQ的过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功,结果有两种请求

  • publisher-confirm,发送者确认
  • 消息成功投递到交换机返回ack
  • 消息未投递到交换机,返回nack
  • publisher-return
  • 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突

消费者确认

RabbitMQ支持消费者确认机制,即消费者成功处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息,

而SpringAMQP允许配置三种确认模式

  • manual:手动ack,需要在业务代码结束后,调用api发送ack
  • auto:自动ack,由spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后会立即被删除
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java 网络架构
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
766 2
|
3月前
|
消息中间件 Java Maven
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
317 1
|
4月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
4月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
4月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
221 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
120 0