RabbitMQ 进阶 -- SpringBoot 集成 RabbitMQ实现生产者与消费者模式

简介: Spring Boot 如何集成RabbitMQ ,详解Spring Boot集成RabbitMQ!
📢📢📢📣📣📣

哈喽!大家好,我是【 Bug 终结者,【CSDNJava领域优质创作者】🏆,阿里云专家博主🏆,51CTO人气博主🏆,InfoQ写作专家🏆 <br/>
一位上进心十足,拥有极强学习力的【 Java领域博主】😜😜😜 <br/>
🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞


❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️

在这里插入图片描述

@[TOC]

一、什么是 AMQP?

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级 消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/ 中间件不同产品,不同的开发语言等条件的限制。 Erlang中的实现有 RabbitMQ等。

AMQP 基于TCP协议之上再次封装的协议,AMQP定义了合适的服务器端域模型,规范服务器的行为(AMQP的服务器端称broker),

☁️AMQP的主要功能

消息中间件的主要功能就是消息的 路由(routing) 和 缓存(Buffering)

AMQP提供了两个重要的模型,Exchange(交换机) 和 Queue (队列)

Exchange的作用

Exchange接收Producer发送的Message,根据不同的路由算法,将Message发送给Message Queue.

Message Queue的作用
  • Message Queue 在 Message没有被 Consumer消费时,缓存这些Message,具体的缓存策略由实现者决定
  • 当Message Queue 与 Message Consumer之间连接畅通时,Message Queue 则需要将消息转发给 Consumer进行消费

注意,如果队列没有指定交换机,则使用 Default 默认交换机

二、RabbitMQ的核心组成

在这里插入图片描述

核心概念

  • Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
  • Connection:连接,应用程序与Broker的网络连接 TCP/IP/三次握手和四次挥手
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
  • Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  • Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
  • Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
  • Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
  • Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
  • Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

三、RabbitMQ的运行流程

在这里插入图片描述

上图为生产者生产消息与消费者订阅并消费消息的大致流程图

四、RabbitMQ支持的消息模式

在这里插入图片描述

具体的模式案例请参考官网:https://www.rabbitmq.com/getstarted.html

工作队列和发布订阅/广播模式用的比较多! 路由模式会消耗一定的内存,要加where筛选过滤

五、RabbitMQ使用场景

解耦、削峰、异步

⛅同步异步问题

串行和并行

串行方式: 将订单信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端

在这里插入图片描述

并行方式 异步线程池

并发方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信,以上三个任务全部完成后,返回给客户端,与串行的差别是,并行的方式可以提高处理的时间

在这里插入图片描述

存在的问题

  • 耦合度高
  • 需要自己写线程池 维护成本太高
  • 如果消息出现了丢失,需要自己做消息补偿
  • 如果保证可靠性,需要自己去写
  • 如果服务器承载不了,需要自己去写高可用

⚡异步消息队列

在这里插入图片描述

使用MQ异步消息队列的好处

  • 完全解耦,用MQ建立桥接
  • 有独立的线程池和运行模型
  • 出现了消息丢失,MQ有持久化功能
  • 如何保证消息的可靠性,死信队列和消息转移的等
  • 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。

按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

因此MQ消息队列适用于

  • 分布式事务的可靠消费和可靠生产
  • 索引、缓存、静态化处理的数据同步
  • 流量监控
  • 日志监控(ELK)
  • 下单、订单分发、抢票

MQ消息队列可达到 高内聚、低耦合

六、SpringBoot 整合RabbitMQ实现消息的生产与消费

RabbitMQ是Spring家族开发的产品,Spring 天然支持RabbitMQ,快速方便引入RabbitMQ!

这里我们介绍 SpringBoot 整合RabbitMQ 实现消息的生产与消费(广播模式/发布订阅模式)

✅创建Maven聚合工程

File ---> New ---> Project ---> Maven ---> 直接Next 进入下一步创建普通的Maven工程即可

在这里插入图片描述

创建一个默认的Maven聚合工程,将src文件夹删除,该工程就是一个Maven聚合工程

😃引入共有依赖

引入依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wanshi</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>rabbitmq-order-producer</module>
        <module>rabbitmq-order-consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    </dependencyManagement>

</project>

⏳创建生产者

在项目内,新建一个Moudle,rabbitmq-order-producer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-order-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

⌛创建消费者

在项目内,新建一个Moudle,rabbitmq-order-cousumer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-order-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

Maven聚合工程创建完成图

在这里插入图片描述

Maven依赖图

在这里插入图片描述

自行手写MainApplication即可

创建完成!

♨️核心源码

application.yml

# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 8.130.28.198
    port: 5672
生产者

OrderService

package com.wanshi.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * @author whc
 * @date 2022/5/23 18:50
 */

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        String exchange_name = "fanout_order_exchange";
        String routeingKey = "";
        rabbitTemplate.convertAndSend(exchange_name, routeingKey, orderId);
    }
}
消费者

交换机的声明与队列我们放在消费者端,因为消费者是先开启的,如果没有交换机和队列,则会报错!

RabbitMQConfiguration

package com.wanshi.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author whc
 * @date 2022/5/23 10:18
 */
@Configuration
public class RabbitMQConfiguration {

    //1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //2.声明队列,sms.fanout.queue email.fanout.queue msg.fanout.queue
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue msgQueue() {
        return new Queue("msg.fanout.queue", true);
    }

    //3.完成绑定关系(队列与交换机完成绑定关系)
    @Bean
    public Binding smsBind() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBind() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding msgBind() {
        return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
    }
}

编写具体业务消费类

FanoutEmailConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:53
 */
@RabbitListener(queues = "email.fanout.queue")
@Component
public class FanoutEmailConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout email ==>" + message);
    }
}

FanoutMsgConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:55
 */
@RabbitListener(queues = "msg.fanout.queue")
@Component
public class FanoutMsgConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout msg ==>" + message);
    }
}

FanoutSmsConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:54
 */
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class FanoutSmsConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout sms ==> " + message);
    }
}

编写完成!

七、测试消息的生产与消费

启动客户端监听查看消息队列的绑定情况

启动客户端

在这里插入图片描述

查看RabbitMQ的交换机与队列绑定情况

交换机声明

在这里插入图片描述

队列声明

在这里插入图片描述

绑定关系

在这里插入图片描述

下面生产者投递消息

生产者端建立测试类

package com.wanshi;

import com.wanshi.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author whc
 * @date 2022/5/23 18:55
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class MainApplicationTest {

    @Autowired
    private OrderService orderService;

    @Test
    public void test1() {
        orderService.makeOrder();
    }
}

启动,投递成功

在这里插入图片描述

查看消息者是否成功消费消息

在这里插入图片描述

成功完成 SpringBoot 与RabbitMQ的整合,并通过发布订阅/广播模式实现

⛵小结

以上就是【Bug 终结者】对 RabbitMQ 进阶 -- SpringBoot 集成 RabbitMQ实现生产者与消费者模式简单的概述, RabbitMQ是一种消息队列中间件,引入RabbitMQ后,可大大提升程序的性能,从而拥有更高的吞吐量,达到高内聚,低耦合

如果这篇【文章】有帮助到你,希望可以给【 Bug 终结者】点个赞👍,创作不易,如果有对【 后端技术】、【 前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【 Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
39 6
|
6月前
|
数据采集 Web App开发 XML
爬虫进阶:Selenium与Ajax的无缝集成
爬虫进阶:Selenium与Ajax的无缝集成
|
7月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1112 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
数据采集 数据安全/隐私保护
Kameleo指纹浏览器进阶使用:轻松集成IPXProxy海外代理IP
Kameleo是一款出色的指纹浏览器,它能够帮助用户实现隐身浏览。大家在进行网络抓取的时候总会碰到一些阻碍,而采取指纹浏览器可以提升网络抓取的效率,并且集成代理IP能增加一层防护,让数据采集更加全面,为制定营销策略提供更好的支持。那如何将Kameleo指纹浏览器与IPXProxy海外代理IP集成?
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
426 1
|
7月前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)