SpringBoot整合RabbitMQ及其原理分析

简介: SpringBoot整合RabbitMQ及其原理分析

上一篇:RabbitMQ基础知识

1、相关依赖

这里无需指定版本号,让其跟着SpringBoot版本走。本示例使用SpringBoot版本号为2.7.10。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

2、生产者、消费者

创建两个SpringBoot应用,模拟消息生产者与消费者【publisher、consumer】。

2-1生产者

编写配置文件,用户名和密码等自行修改   这里虚拟机的名称是上一篇文章中新建的。

server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名
#密码
spring.rabbitmq.password=密码
#配置虚拟机
spring.rabbitmq.virtual-host=demo

声明交换机、队列并绑定:

@ConfigurationpublicclassRabbitMqConfig {
@BeanpublicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) {
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
returnrabbitTemplate;
    }
@BeanpublicMessageConverterjackson2JsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
@BeanpublicDirectExchangegetExchange(){
returnnewDirectExchange("directExchange",false,false);
    }
@BeanpublicQueuegetQueue(){
returnnewQueue("publisher.addUser",true,false,false);
    }
@BeanpublicBindinggetBinding(DirectExchangeexchange,Queuequeue){
returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }
}

新建User实体类

@DatapublicclassUser {
privateLongid;
privateStringname;
privateStringdesc;
}

在方法中使用RabbitTemplate来发送消息:

publicinterfacePublisherService {
/*** 添加用户* @param user 用户信息*/voidaddUser(Useruser);
}
@RequiredArgsConstructor@ServicepublicclassPublisherServiceImplimplementsPublisherService{
privatefinalRabbitTemplaterabbitTemplate;
@OverridepublicvoidaddUser(Useruser) {
rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user);
    }
}

以上需要注意的就是交换机的名称队列名routingKey。示例中使用的是直连交换机,routingKey需要和队列名保持一致。不懂的可以查看上一篇文章。

controller:

@RequiredArgsConstructor@RestController@RequestMapping("/user")
publicclassUserController {
privatefinalPublisherServicepublisherService;
@PostMapping("/add")
publicvoidadd(){
Useruser=newUser();
user.setId(1000L);
user.setName("黄忠");
user.setDesc("老兵不死,只是逐渐凋零");
publisherService.addUser(user);
    }
}

2-2消费者

消费者的配置和生产者一样,不赘述了,直接看代码:

@Service@Slf4jpublicclassConsumerService {
@RabbitListener(queues="publisher.addUser")
publicvoidaddUser(StringuserStr){
Useruser=JSONObject.parseObject(userStr,User.class);
log.info(user.toString());
    }
}

@RabbitListener 注解是指定某方法作为消息消费的方法,指定队列名称。@RabbitListener 如果标注在类上,需配合 @RabbitHandler 注解一起使用,根据接受的参数类型进入具体的方法中

2-3测试

消费端在启动时可能会报找不到交换机或队列,只需要让生产者发送一次消息,从控制台就可以看到相关的交换机和队列等信息了。

可以看到消费者成功消费了消息:

3、消费流程

通过上述操作,我们已经会简单地使用RabbitMQ了,接下来了解一下它的整个流程。如此可以让我们掌握的更牢固。

生产者:

  • 生产者连接到Message Broker【也就是RabbitMQ服务】,建立一个连接( Connection)开启一个信道(Channel)。
  • 生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化等。
  • 生产者声明一个队列并设置相关属性。
  • 生产者通过路由键【Routing Key】将交换机和队列绑定。
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
  • 相应的交换机根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道。
  • 关闭连接。

消费者:

  • 消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。
  • 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
  • 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
  • 消费者确认(ack) 接收到的消息。
  • RabbitMQ 从队列中删除相应己经被确认的消息。
  • 关闭信道。
  • 关闭连接。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
Java Spring 容器
SpringBoot自动配置的原理是什么?
Spring Boot自动配置核心在于@EnableAutoConfiguration注解,它通过@Import导入配置选择器,加载META-INF/spring.factories中定义的自动配置类。这些类根据@Conditional系列注解判断是否生效。但Spring Boot 3.0后已弃用spring.factories,改用新格式的.imports文件进行配置。
728 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
2月前
|
前端开发 Java 数据库连接
SpringBoot参数校验底层原理和实操。深度历险、深度解析(图解+秒懂+史上最全)
SpringBoot参数校验底层原理和实操。深度历险、深度解析(图解+秒懂+史上最全)
SpringBoot参数校验底层原理和实操。深度历险、深度解析(图解+秒懂+史上最全)
|
4月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
175 32
|
2月前
|
XML 人工智能 IDE
Springboot整合SSMP报错分析
本文介绍了Springboot整合SSMP框架时常见的报错及解决方案,包括MyBatis-Plus版本不兼容导致的Lambda表达式条件构造器报错及表名不匹配问题。通过升级或降级MyBatis-Plus版本、使用@TableName注解或配置table-prefix属性,可有效解决上述问题,帮助开发者避免在整合SSMP时出现不必要的错误。
164 0
|
5月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
1343 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
5月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
205 11
RocketMQ原理—3.源码设计简单分析下
|
5月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
5月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳

热门文章

最新文章