RabbitMQ实战(四) - RabbitMQ & Spring整合开发

简介: 你将学到 RabbitMQ 整合 Spring AMQP实战 RabbitMQ 整合 Spring Boot实战 RabbitMQ 整合 Spring Cloud实战

0 相关源码

1 你将学到

  • RabbitMQ 整合 Spring AMQP实战
  • RabbitMQ 整合 Spring Boot实战
  • RabbitMQ 整合 Spring Cloud实战

2 SpringAMQP用户管理组件 - RabbitAdmin

RabbitAdmin 类可以很好的操作 rabbitMQ,在 Spring 中直接进行注入即可

autoStartup 必须设置为 true,否则 Spring 容器不会加载它.

2.1 源码分析

RabbitAdmin 的底层实现

  • 从 Spring 容器中获取 Exchange、Bingding、Routingkey 以及Queue 的 @Bean 声明
  • 然后使用 rabbitTemplate 的 execute 方法进行执行对应的声明、修改、删除等一系列 RabbitMQ 基础功能操作。例如添加交换机、删除一个绑定、清空一个队列里的消息等等
  • 依赖结构

RabbitAdmin实现了4个Interface: AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,InitializingBean。

AmqpAdmin

为AMQP指定一组基本的便携式AMQP管理操作

ApplicationEventPublisherAware

实现该接口的类,通过函数setApplicationEventPublisher()获得它执行所在的ApplicationEventPublisher。

在这里插入图片描述

ApplicationContextAware

实现该接口的类,通过函数setApplicationContext()获得它执行所在的ApplicationContext。一般用来初始化object

InitializingBean

若class中实现该接口,在Spring Container中的bean生成之后,自动调用函数afterPropertiesSet()。

因其实现了InitializingBean接口,其中只有一个方法,且在Bean加载后就执行

该功能可以被用来检查是否所有的mandatory properties都设置好

  • 以上Interfaces的执行顺序
    ApplicationEventPublisherAware -> ApplicationContextAware -> InitializingBean.

RabbitAdmin借助于 ApplicationContextAware 和 InitializingBean来获取我们在配置类中声明的exchange, queue, binding beans等信息并调用channel的相应方法来声明。

  • 首先,RabbitAdmin借助于ApplicationContextAware来获取ApplicationContext applicationContext
  • 然后,借助于InitializingBean以及上面的applicationContext来实现rabbitMQ entity的声明

下面是RabbitAdmin中afterPropertiesSet()函数的代码片段。这里在创建connection的时候调用函数initialize()。

于是以此为突破口进行源码分析

  • RabbitAdmin#afterPropertiesSet

这里

最后分别调用函数declareExchanges(),declareQueues(),declareBindings()来声明RabbitMQ Entity

  • 先定义了三个集合,利用applicationContext.getBeansOfType来获得container中的Exchange,Queue,Binding声明放入集合中
  • 然后调用filterDeclarables()来过滤不能declareable的bean
  • 按照RabbitMQ的方式拼接
  • 使用rabbitTemplate执行交互
    2.2 实操

回顾一下消费者配置

1. 设置交换机类型

2. 将队列绑定到交换机

交换机类型:

    FanoutExchange 类型: 将消息分发到所有的绑定队列,无 routingkey 的概念

    HeadersExchange 类型:通过添加属性 key-value 匹配

    DirectExchange :按照 routingkey 分发到指定队列

    TopicExchange : 多关键字匹配

  • 测试代码
  • 查看管控台
    3 SpringAMQP - RabbitMQ声明式配置使用SpringAMQP 声明即在 rabbit 基础 API 里面声明一个 exchange、Bingding、queue。使用SpringAMQP 去声明,就需要使用 @Bean 的声明方式

  • 查看管控台
    3 消息模板 - RabbitTemplate上节中最后提到,这是与与 SpringAMQP 整合发送消息的关键类,它提供了丰富的发送消息方法

包括可靠性投递消息方法、回调监听消息接口 ConfirmCallback、返回值确认接口 ReturnCallback等.
同样我们需要注入到 Spring 容器中,然后直接使用.
RabbitTemplate 在 Spring 整合时需要实例化,但是在 Springboot 整合时,在配置文件里添加配置即可

  • 先声明bean
  • 测试4 SpringAMQP消息容器-SimpleMessageListenerContainer这个类非常的强大,我们可以对他进行很多的设置,用对于消费者的配置项,这个类都可以满足。它有监听单个或多个队列、自动启动、自动声明功能。
  • 设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。但是我们在实际生产中,很少使用事务,基本都是采用补偿机制
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕获 Handler 函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于 RabbitMQ 的自制定化后端管控台在进行设置的时候,也是根据这一去实现的




5 SpringAMQP消息适配器-MessageListenerAdapter消息监听适配器,通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换.
允许监听器方法对消息内容类型进行操作,完全独立于RabbitMQ API

默认情况下,传入Rabbit消息的内容在被传递到目标监听器方法之前被提取,以使目标方法对消息内容类型进行操作以String或者byte类型进行操作,而不是原始Message类型。 (消息转换器)

消息类型转换委托给MessageConverter接口的实现类。 默认情况下,将使用SimpleMessageConverter。 (如果您不希望进行这样的自动消息转换,

那么请自己通过#setMessageConverter MessageConverter设置为null)

如果目标监听器方法返回一个非空对象(通常是消息内容类型,例如String或byte数组),它将被包装在一个Rabbit Message 中,并发送使用来自Rabbit ReplyTo属性或通过#setResponseRoutingKey(String)指定的routingKey的routingKey来传送消息。(使用rabbitmq 来实现异步rpc功能时候会使用到这个属性)。

注意:发送响应消息仅在使用ChannelAwareMessageListener入口点(通常通过Spring消息监听器容器)时可用。 用作MessageListener不支持生成响应消息。

源码分析

继承自AbstractAdaptableMessageListener类,实现了MessageListenerChannelAwareMessageListener接口

MessageListenerChannelAwareMessageListener接口的onMessage方法就是具体容器监听队列处理队列消息的方法

实操

  • 委托类MessageDelegate,类中定义的方法也就是目标监听器的处理方法
  • 配置类代码
  • 运行测试代码
  • 结果

从源码分析小节中的成员变量,我们可以看出使用MessageListenerAdapter处理器进行消息队列监听处理

  • 如果容器没有设置setDefaultListenerMethod
    则处理器中默认的处理方法名是handleMessage
  • 如果设置了setDefaultListenerMethod

则处理器中处理消息的方法名就是setDefaultListenerMethod方法参数设置的值

也可以通过setQueueOrTagToMethodName方法为不同的队列设置不同的消息处理方法。

MessageListenerAdapteronMessage方法

  • 如果将参数改为String运行会出错!应当是字节数组,这时就需要使用转换器才能保证正常运行
  • 使用转换器



测试代码运行成功!


6 消息转换器 - MessageConverter我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter

  • 我们自定义常用转换器,都需要实现这个接口,然后重写其中的两个方法
    常见的转换器
  • Json 转换器 - jackson2JsonMessageConverter
    Java 对象的转换功能
  • DefaultJackson2JavaTypeMapper 映射器
    Java对象的映射关系
  • 自定义二进制转换器
    比如图片类型、PDF、PPT、流媒体实操
  • Order类

  • 配置JSON转换器
  • 测试代码

  • 配置Java对象转换器
  • 测试代码及结果
  • 多个Java对象映射转换
  • 测试代码及结果

  • 全局转换器
  • 图片转换器实现
  • PDF转换器实现

  • 测试代码及结果
    7 RabbitMQ与SpringBoot2.x整合实战7.1 配置详解
  • publisher-confirms
    实现一个监听器监听 broker 给我们返回的确认请求RabbitTemplate.ConfirmCallback
  • publisher-returns
    保证消息对 broker 可达,若出现路由键不可达情况,则使用监听器对不可达消息后续处理,保证消息路由成功 - RabbitTemplate.ReturnCallback在发送消息的时候对 template 进行配置 mandatory = true 保证监听有效

在生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等Pro

  • 配置文件
  • 主配置
  • 添加一个自定义的交换机
  • 添加一个Q
  • 建立绑定关系


  • 测试及结果
    Con配置消费端的 RabbitListener 是一个组合注解,里面可以注解配置 。

@QueueBinding @Queue @Exchange 直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。

  • 将Pro中的绑定全部删除,再启动Con的sb服务

发送一个 Java 实体对象

  • 在Con声明队列、交换机、routingKey基本配置
  • Con
    Payload 注解中的路径要跟Pro的实体路径完全一致,要不然会找到不到该类,这里为了简便就不写一个 common.jar 了,在实际开发里面,这个 Java Bean 应该放在 common.jar中
  • 注意实体要实现 Serializable 序列化接口,要不然发送消息会失败
  • Pro 照样跟着写一个发消息的方法
  • 测试代码及结果

8 RabbitMQ & Spring Cloud Stream整合实战Spring Cloud全家桶在整个中小型互联网公司异常的火爆,Spring Cloud Stream也就渐渐的被大家所熟知,本小节主要来绍RabbitMQ与Spring Cloud Stream如何集成8.1 编程模型要了解编程模型,您应该熟悉以下核心概念

  • 目标绑定器
    提供与外部消息传递系统集成的组件
  • 目标绑定
    外部消息传递系统和应用程序之间的桥接提供的生产者和消费者消息(由目标绑定器创建)
  • 消息
    生产者和消费者用于与目标绑定器(以及通过外部消息传递系统的其他应用程序)通信的规范数据结构

8.2 应用模型Spring Cloud Stream应用程序由中间件中立核心组成。该应用程序通过Spring Cloud Stream注入其中的输入和输出通道与外界通信。通过中间件特定的Binder实现,通道连接到外部代理。
8.3 RabbitMQ绑定概述默认情况下,RabbitMQ Binder实现将每个目标映射到TopicExchange。对于每个使用者组,Queue绑定到该TopicExchange。每个使用者实例都为其组的Queue具有相应的RabbitMQ Consumer实例。对于分区生成器和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。对于匿名使用者(没有组属性的用户),使用自动删除队列(具有随机的唯一名称)。

Barista接口: Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一 通道进行发送消息还是从中接收消息

8.4 扩展 - 注解

  • @Output:输出注解,用于定义发送消息接口
  • @Input:输入注解,用于定义消息的消费者接口
  • @StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常适合,但是使用SpringCloudStream框架有一个非常大的问题就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题

这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本Spring Cloud Stream的定位

8.5 实操

Pro

  • pom核心文件
  • Sender

注解@EnableBinding声明了这个应用程序绑定了2个通道:INPUT和OUTPUT。这2个通道是在接口Barista中定义的(Spring Cloud Stream默认设置)。所有通道都是配置在一个具体的消息中间件或绑定器中

  • Barista接口
  • @Input
    声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
  • @Output
    声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
  • Bindings — 声明输入和输出通道的接口集合。
  • Binder — 消息中间件的实现,如Kafka或RabbitMQ
  • Channel — 表示消息中间件和应用程序之间的通信管道
  • StreamListeners — bean中的消息处理方法,在中间件的MessageConverter特定事件中进行对象序列化/反序列化之后,将在信道上的消息上自动调用消息处理方法。
  • Message Schemas — 用于消息的序列化和反序列化,这些模式可以静态读取或者动态加载,支持对象类型的演变。

将消息发布到指定目的地是由发布订阅消息模式传递。发布者将消息分类为主题,每个主题由名称标识。订阅方对一个或多个主题表示兴趣。中间件过滤消息,将感兴趣的主题传递给订阅服务器。订阅方可以分组,消费者组是由组ID标识的一组订户或消费者,其中从主题或主题的分区中的消息以负载均衡的方式递送。

Con

  • Pom核心文件
  • 应用启动类
  • Barista接口
  • 配置文件
  • 接收
  • 启动Con服务,查看管控台


  • 运行Pro测试代码及结果


9 总结

本文我们学习了Spring AMQP的相关知识,通过实战对RabbitMQ集成Spring有了直观的认识,这样为

我们后续的学习、工作使用都打下了坚实的基础,最后我们整合了SpringBoot与Spring Cloud Stream,更方便更高效的集成到我们的应用服务中去!

参考

SpringAMQP 用户管理组件 RabbitAdmin 以及声明式配置

Spring Boot - RabbitMQ源码分析

SpringAMQP 之 RabbitTemplate

SpringAMQP 消息容器 - SimpleMessageListenerContainer

MessageListenerAdapter详解

SpringAMQP 消息转换器 - MessageConverter

RabbitMQ 与 SpringBoot2.X 整合

Spring Cloud Stream

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
18天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
773 8
|
29天前
|
Java 数据库连接 数据格式
【Java笔记+踩坑】Spring基础2——IOC,DI注解开发、整合Mybatis,Junit
IOC/DI配置管理DruidDataSource和properties、核心容器的创建、获取bean的方式、spring注解开发、注解开发管理第三方bean、Spring整合Mybatis和Junit
【Java笔记+踩坑】Spring基础2——IOC,DI注解开发、整合Mybatis,Junit
|
29天前
|
Java 数据库连接 Maven
Spring基础1——Spring(配置开发版),IOC和DI
spring介绍、入门案例、控制反转IOC、IOC容器、Bean、依赖注入DI
Spring基础1——Spring(配置开发版),IOC和DI
|
1月前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
59 2
|
1月前
|
NoSQL 前端开发 Java
使用 Spring Boot + Neo4j 实现知识图谱功能开发
在数据驱动的时代,知识图谱作为一种强大的信息组织方式,正逐渐在各个领域展现出其独特的价值。本文将围绕使用Spring Boot结合Neo4j图数据库来实现知识图谱功能开发的技术细节进行分享,帮助读者理解并掌握这一技术栈在实际项目中的应用。
114 4
|
1月前
|
安全 Java 开发者
强大!Spring Cloud Gateway新特性及高级开发技巧
在微服务架构日益盛行的今天,网关作为微服务架构中的关键组件,承担着路由、安全、监控、限流等多重职责。Spring Cloud Gateway作为新一代的微服务网关,凭借其基于Spring Framework 5、Project Reactor和Spring Boot 2.0的强大技术栈,正逐步成为业界的主流选择。本文将深入探讨Spring Cloud Gateway的新特性及高级开发技巧,助力开发者更好地掌握这一强大的网关工具。
132 6
|
1月前
|
IDE Java 开发工具
还在为繁琐的配置头疼吗?一文教你如何用 Spring Boot 快速启动,让开发效率飙升,从此告别加班——打造你的首个轻量级应用!
【9月更文挑战第2天】Spring Boot 是一款基于 Spring 框架的简化开发工具包,采用“约定优于配置”的原则,帮助开发者快速创建独立的生产级应用程序。本文将指导您完成首个 Spring Boot 项目的搭建过程,包括环境配置、项目初始化、添加依赖、编写控制器及运行应用。首先需确保 JDK 版本不低于 8,并安装支持 Spring Boot 的现代 IDE,如 IntelliJ IDEA 或 Eclipse。
92 5
|
2月前
|
Java Spring 人工智能
AI 时代浪潮下,Spring 框架异步编程点亮高效开发之路,你还在等什么?
【8月更文挑战第31天】在快节奏的软件开发中,Spring框架通过@Async注解和异步执行器提供了强大的异步编程工具,提升应用性能与用户体验。异步编程如同魔法,使任务在后台执行而不阻塞主线程,保持界面流畅。只需添加@Async注解即可实现方法的异步执行,或通过配置异步执行器来管理线程池,提高系统吞吐量和资源利用率。尽管存在线程安全等问题,但异步编程能显著增强应用的响应性和效率。
37 0
|
2月前
|
Java Spring 开发者
解锁 Spring Boot 自动化配置的黑科技:带你走进一键配置的高效开发新时代,再也不怕繁琐设置!
【8月更文挑战第31天】Spring Boot 的自动化配置机制极大简化了开发流程,使开发者能专注业务逻辑。通过 `@SpringBootApplication` 注解组合,特别是 `@EnableAutoConfiguration`,Spring Boot 可自动激活所需配置。例如,添加 JPA 依赖后,只需在 `application.properties` 配置数据库信息,即可自动完成 JPA 和数据源设置。这一机制基于多种条件注解(如 `@ConditionalOnClass`)实现智能配置。深入理解该机制有助于提升开发效率并更好地解决问题。
55 0
|
2月前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
66 0

热门文章

最新文章