SpringBoot整合RabbitMQ之典型应用场景实战一

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。

实战前言
RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。

RabbitMQ 官网拜读
首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友可以耐心的阅读其中的相关介绍,相信会有一定的收获,地址可见:www.rabbitmq.com/getstarted.…

在阅读该手册过程中,我们可以得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而我们在实战应用中,实际上也是紧紧围绕着 “消息模型” 来展开撸码的!

下面,我就介绍一下这一消息模型的演变历程,当然,这一历程在 RabbitMQ 官网也是可以窥览得到的!
MQ1
MQ2
MQ3
上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名!
生产者:发送消息的程序
消费者:监听接收消费消息的程序
消息:一串二进制数据流
队列:消息的暂存区/存储区
交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种
路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列!

正如上图所展示的消息模型的演变,接下来我们将以代码的形式实战各种典型的业务场景!

SpringBoot 整合 RabbitMQ 实战
工欲善其事,必先利其器。我们首先需要借助 IDEA 的 Spring Initializr 用 Maven 构建一个 SpringBoot 的项目,并引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依赖。搭建完成之后,可以简单的写个 RabbitMQController 测试一下项目是否搭建是否成功(可以暂时用单模块方式构建)

紧接着,我们进入实战的核心阶段,在项目或者服务中使用 RabbitMQ,其实无非是有几个核心要点要牢牢把握住,这几个核心要点在撸码过程中需要“时刻的游荡在自己的脑海里”,其中包括:
我要发送的消息是什么

我应该需要创建什么样的消息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等
我要处理的消息是实时的还是需要延时/延迟的?
消息的生产者需要在哪里写,消息的监听消费者需要在哪里写,各自的处理逻辑是啥

基于这样的几个要点,我们先小试牛刀一番,采用 RabbitMQ 实战异步写日志与异步发邮件。当然啦,在进行实战前,我们需要安装好 RabbitMQ 及其后端控制台应用,并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件。

1.RabbitMQ 安装完成后,打开后端控制台应用:http://localhost:15672/ guest guest 登录,看到下图即表示安装成功
MQ4A

2.然后是项目配置文件层面的配置 application.properties

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5

其中,后面三个参数主要是用于“并发量的配置”,表示:并发消费者的初始化值,并发消费者的最大值,每个消费者每次监听时可拉取处理的消息数量。

接下来,我们需要以 Configuration 的方式配置 RabbitMQ 并以 Bean 的方式显示注入 RabbitMQ 在发送接收处理消息时相关 Bean 组件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充当消息的发送组件,后者是用于管理RabbitMQ监听器 的容器工厂,其代码如下:

@Configuration
    public class RabbitmqConfig {
    private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
    @Autowired
    private Environment env;
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
        return factory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }}

RabbitMQ 实战:业务模块解耦以及异步通信
在一些企业级系统中,我们经常可以见到一个执行 function 通常是由许多子模块组成的,这个 function 在执行过程中,需要 同步 的将其代码从头开始执行到尾,即执行流程是 module_A -> module_B -> module_C -> module_D,典型的案例可以参见汇编或者 C 语言等面向过程语言开发的应用,现在的一些 JavaWeb 应用也存在着这样的写法。

而我们知道,这个执行流程其实对于整个 function 来讲是有一定的弊端的,主要有两点:
整个 function 的执行响应时间将很久;
如果某个 module 发生异常而没有处理得当,可能会影响其他 module 甚至整个 function 的执行流程与结果;

整个 function 中代码可能会很冗长,模块与模块之间可能需要进行强通信以及数据的交互,出现问题时难以定位与维护,甚至会陷入 “改一处代码而动全身”的尴尬境地!

故而,我们需要想办法进行优化,我们需要将强关联的业务模块解耦以及某些模块之间实行异步通信!下面就以两个场景来实战我们的优化措施!

场景一:异步记录用户操作日志
对于企业级应用系统或者微服务应用中,我们经常需要追溯跟踪记录用户的操作日志,而这部分的业务在某种程度上是不应该跟主业务模块耦合在一起的,故而我们需要将其单独抽出并以异步的方式与主模块进行异步通信交互数据。

下面我们就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也实现“用户登录成功记录日志”的场景。如前面所言,我们需要在脑海里回荡着几个要点:
消息模型:DirectExchange+RoutingKey 消息模型
消息:用户登录的实体信息,包括用户名,登录事件,来源的IP,所属日志模块等信息
发送接收:在登录的 Controller 中实现发送,在某个 listener 中实现接收并将监听消费到的消息入数据表;实时发送接收

首先我们需要在上面的 RabbitmqConfig 类中创建消息模型:包括 Queue、Exchange、RoutingKey 等的建立,代码如下:
MQ5

上图中 env 获取的信息,我们需要在 application.properties 进行配置,其中 mq.env=local
MQ6
此时,我们将整个项目/服务跑起来,并打开 RabbitMQ 后端控制台应用,即可看到队列以及交换机及其绑定已经建立好了,如下所示:
MQ7
MQ8
接下来,我们需要在 Controller 中执行用户登录逻辑,记录用户登录日志,查询获取用户角色视野资源信息等,由于篇幅关系,在这里我们重点要实现的是用MQ实现 “异步记录用户登录日志” 的逻辑,即在这里 Controller 将充当“生产者”的角色,核心代码如下:

@RestController
    public class UserController {
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
    private static final String Prefix="user";
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private UserMapper userMapper;
    @Autowired
    private UserLogMapper userLogMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private Environment env;
    @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            //TODO:执行登录逻辑
            User user=userMapper.selectByUserNamePassword(userName,password);
            if (user!=null){
                //TODO:异步写用户日志
                try {
                    UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
                    userLog.setCreateTime(new Date());
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));
                    Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); 
                    rabbitTemplate.convertAndSend(message);         
                }catch (Exception e){
                    e.printStackTrace();
                }
                //TODO:塞权限数据-资源数据-视野数据
            }else{
                response=new BaseResponse(StatusCode.Fail);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return response;
    }}

在上面的“发送逻辑”代码中,其实也体现了我们最开始介绍的演进中的几种消息模型,比如我们是将消息发送到 Exchange 的而不是 Queue,消息是以二进制流的形式进行传输等等。当用 postman 请求到这个 controller 的方法时,我们可以在 RabbitMQ 的后端控制台应用看到一条未确认的消息,通过 GetMessage 即可看到其中的详情,如下:
MQ9
MQ10

最后,我们将开发消费端的业务代码,如下:

@Component
    public class CommonMqListener {
    private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private UserLogMapper userLogMapper;
    @Autowired
    private MailService mailService;
    /**
     * 监听消费用户日志
     * @param message
     */
    @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeUserLogQueue(@Payload byte[] message){
        try {
            UserLog userLog=objectMapper.readValue(message, UserLog.class);
            log.info("监听消费用户日志 监听到消息: {} ",userLog);
            //TODO:记录日志入数据表
            userLogMapper.insertSelective(userLog);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

将服务跑起来之后,我们即可监听消费到上面 Queue 中的消息,即当前用户登录的信息,而且,我们也可以看到“记录用户登录日志”的逻辑是由一条异于主业务线程的异步线程去执行的:
MQ11
“异步记录用户操作日志”的案例我想足以用于诠释上面所讲的相关理论知识点了,在后续篇章中,由于篇幅限制,我将重点介绍其核心的业务逻辑!

场景二:异步发送邮件
发送邮件的场景,其实也是比较常见的,比如用户注册需要邮箱验证,用户异地登录发送邮件通知等等,在这里我以 RabbitMQ 实现异步发送邮件。实现的步骤跟场景一几乎一致!

  1. 消息模型的创建
    MQ12
  2. 配置信息的创建
    MQ13
  3. 生产端
    MQ14
  4. 消费端
    MQ15

彩蛋:本博文就先介绍RabbitMQ实战的典型业务场景之业务服务模块异步解耦与通信吧,下篇博文将继续讲解RabbitMQ实战在高并发系统的场景的应用记忆消息确认机制跟并发量的配置实战!另外,博主已将RabbitMQ相关技术以及场景实战的相关要点录制成了视频教程,感兴趣小伙伴可以前往学习观看:RabbitMQ实战 !!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
131 6
|
1月前
|
Java API Spring
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中拦截器的入门教程和实战项目场景实现的详细指南。
28 0
springboot学习七:Spring Boot2.x 拦截器基础入门&实战项目场景实现
|
1月前
|
Java API Spring
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
这篇文章是关于Spring Boot 2.x中过滤器的基础知识和实战项目应用的教程。
27 0
springboot学习六:Spring Boot2.x 过滤器基础入门&实战项目场景实现
|
2月前
|
缓存 NoSQL Java
Springboot实战——黑马点评之秒杀优化
【9月更文挑战第27天】在黑马点评项目中,秒杀功能的优化对提升系统性能和用户体验至关重要。本文提出了多项Spring Boot项目的秒杀优化策略,包括数据库优化(如索引和分库分表)、缓存优化(如Redis缓存和缓存预热)、并发控制(如乐观锁、悲观锁和分布式锁)以及异步处理(如消息队列和异步任务执行)。这些策略能有效提高秒杀功能的性能和稳定性,为用户提供更佳体验。
163 6
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
NoSQL Java Redis
Redis6入门到实战------ 八、Redis与Spring Boot整合
这篇文章详细介绍了如何在Spring Boot项目中整合Redis,包括在`pom.xml`中添加依赖、配置`application.properties`文件、创建配置类以及编写测试类来验证Redis的连接和基本操作。
Redis6入门到实战------ 八、Redis与Spring Boot整合
|
3月前
|
Java API UED
【实战秘籍】Spring Boot开发者的福音:掌握网络防抖动,告别无效请求,提升用户体验!
【8月更文挑战第29天】网络防抖动技术能有效处理频繁触发的事件或请求,避免资源浪费,提升系统响应速度与用户体验。本文介绍如何在Spring Boot中实现防抖动,并提供代码示例。通过使用ScheduledExecutorService,可轻松实现延迟执行功能,确保仅在用户停止输入后才触发操作,大幅减少服务器负载。此外,还可利用`@Async`注解简化异步处理逻辑。防抖动是优化应用性能的关键策略,有助于打造高效稳定的软件系统。
75 2
|
3月前
|
JSON Java API
解码Spring Boot与JSON的完美融合:提升你的Web开发效率,实战技巧大公开!
【8月更文挑战第29天】Spring Boot作为Java开发的轻量级框架,通过`jackson`库提供了强大的JSON处理功能,简化了Web服务和数据交互的实现。本文通过代码示例介绍如何在Spring Boot中进行JSON序列化和反序列化操作,并展示了处理复杂JSON数据及创建RESTful API的方法,帮助开发者提高效率和应用性能。
172 0
|
3月前
|
SQL Java 数据库连接
Spring Boot联手MyBatis,打造开发利器:从入门到精通,实战教程带你飞越编程高峰!
【8月更文挑战第29天】Spring Boot与MyBatis分别是Java快速开发和持久层框架的优秀代表。本文通过整合Spring Boot与MyBatis,展示了如何在项目中添加相关依赖、配置数据源及MyBatis,并通过实战示例介绍了实体类、Mapper接口及Controller的创建过程。通过本文,你将学会如何利用这两款工具提高开发效率,实现数据的增删查改等复杂操作,为实际项目开发提供有力支持。
200 0
|
3月前
|
Java 开发者 Spring
Spring Boot实战宝典:揭秘定时任务的幕后英雄,让业务处理如流水般顺畅,轻松驾驭时间管理艺术!
【8月更文挑战第29天】在现代应用开发中,定时任务如数据备份、报告生成等至关重要。Spring Boot作为流行的Java框架,凭借其强大的集成能力和简洁的配置方式,为开发者提供了高效的定时任务解决方案。本文详细介绍了如何在Spring Boot项目中启用定时任务支持、编写定时任务方法,并通过实战案例展示了其在业务场景中的应用,同时提供了注意事项以确保任务的正确执行。
53 0
下一篇
无影云桌面