RabbitMQ入门

简介: RabbitMQ是基于Erlang开发的开源消息中间件,支持AMQP协议,实现应用间解耦与异步通信。其核心组件包括生产者、消费者、队列、交换机和虚拟主机,可通过Docker快速部署并结合SpringAMQP实现消息收发。

2.1 RabbitMQ介绍

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

RabbitMQ的架构如图:

其中包含几个概念:

  • publisher:生产者,也就是发送消息的应用程序
  • consumer:消费者,也就是消费消息的应用程序
  • queue:队列,存储消息的缓冲区。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理,下一节我们就一起来学习控制台的使用。

2.2. 安装

已帮大家装好mq,但是这里面有很多冗余的数据,你可以直接启动

  • docker ps -a:查看所有,可以看到有单机mq,和集群的mq1~3
  • docker start mq启动即可

或先删除【建议】

  • docker rm -f mq
  • 然后执行下面的启动脚本即可

或启动时候,命名一个新的

docker run \

-e RABBITMQ_DEFAULT_USER=itheima \

-e RABBITMQ_DEFAULT_PASS=123321 \

-v mq-plugins:/plugins \

--name mq197 \

-p 15672:15672 \

-p 5672:5672 \

-d \

rabbitmq:3.8-management

我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:

找到课前资料下的mq.tar(rabbitmq的镜像文件),上传到/root下。

利用docker load命令加载:docker load -i mq.tar

执行下边的脚本创建容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.101.68:15672 即可看到管理控制台。首次访问需要登录,默认的用户名:itheima,密码:123321。登录后即可看到管理控制台总览页面:

2.3. 收发消息

2.3.1 搭建环境

RabbitMQ安装成功后下边我们编写消息发送与消息接收程序实现收发消息,如下图:publisher即消息发送者将消息发送到MQ的队列中,consumer即消息消费者从MQ中接收消息。

RabbitMQ通信采用了AMQP (Advanced Message Queuing Protocol) 协议,因此它具备跨语言的特性,任何语言只要遵循AMQP协议都可以使用RabbitMQ收发消息。

RabbitMQ官方也提供了各种不同语言的客户端API,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

下边使用SpringAMQP实现消息收发,上图是RabbitMQ最简单的工作模型,我们仅作测试使用,这种模式一般很少在生产中使用。

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

将其复制到你的工作空间,然后用Idea打开,项目结构如图:

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.itcast.demo</groupId>
  <artifactId>mq-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <modules>
    <module>publisher</module>
    <module>consumer</module>
  </modules>
  <packaging>pom</packaging>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.12</version>
    <relativePath/>
  </parent>
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
  </dependencies>
</project>

设置java版本

因此,子工程中就可以直接使用SpringAMQP了。

2.3.2 创建队列

首先进入RabbitMQ控制台创建队列,新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。

2.3.3 消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在publisher服务的test下编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("消息发送成功:{}",message);
    }
}

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

我们可以在RabbitMQ的控制台去查看消息

2.3.4 消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.3.5 测试

测试流程:

  • 启动consumer服务
  • 在publisher服务中运行测试代码,发送MQ消息。
  • 观察consumer控制台日志收到消息:

2.3.6 推模式与拉模式

在RabbitMQ中,消息传递给消费者的方式有两种:推模式(Push)和拉模式(Pull)。

  1. 推模式(Push):这是最常用的模式,在这种模式下,一旦消费者订阅了一个队列,RabbitMQ就会自动将队列中的消息发送给消费者,这种方式不需要消费者持续地询问是否有新的消息,而是由Broker在有消息时主动发送给消费者。
  2. 拉模式(Pull):在这种模式下,消费者需要主动请求Broker来获取消息。

在实践中,推模式更常见,因为它可以减少消费者的网络负载,并且可以让Broker更好地控制消息的传递速率。然而,拉模式也有其应用场景,比如当消费者想要精确控制消息获取的时候。

RabbitMQ默认采用的是推模式,但同时也提供了拉模式的支持,以满足不同的应用场景需求。

2.4. 数据隔离

2.4.1 用户管理

RabbitMQ支持多租户,多个系统可以同时使用一个RabbitMQ。

什么是多租户一种软件架构设计模式,允许多个租户(用户、组织或客户)共享同一套应用程序或系统实例,同时确保每个租户的数据和配置是隔离的。比如一张表增加一个companyId,那么就是不同的公司共用一套代码+数据库,但是又做了数据隔离

公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用虚拟主机进行隔离,将不同项目隶属不同的虚拟主机。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的虚拟主机,将每个项目的数据隔离。

下边学习用户管理和虚拟主机的配置。

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

下边我们给黑马商城创建一个新的用户,命名为hmall,密码123,注意选择adminstrator

你会发现此时hmall用户没有任何virtual host的访问权限:

接下来我们来给hmall用户授权。

2.4.2 虚拟主机

下边配置虚拟主机。

我们先退出登录:

切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页:

可以看到目前只有一个默认的virtual host,名字为 /

我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/

创建完成后如图:

由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了:

此时,点击页面右上角的virtual host下拉菜单,切换virtual host/hmall

然后再次查看queues选项卡,会发现之前的队列已经看不到了:

这就是基于virtual host 的隔离效果。

2.4.3 测试

下边我们用hmall用户及新创建虚拟主机。

修改publisher及consumer的application.yml

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

重新进行收发消息的测试,发现消息发发放到了virtual-host: /hmall 下的队列,如下图:

再启动consumer服务接收消息。

3 RabbitMQ工作模型

相关文章
Topic交换机(自行测试)
Topic交换机支持通配符匹配RoutingKey,实现灵活路由。BindingKey用`.`分隔,`*`匹配一个词,`#`匹配零个或多个词。相比Direct交换机,Topic更适用于复杂路由场景。
Direct交换机
Direct交换机根据RoutingKey将消息路由到指定队列,实现精准消息分发。与Fanout广播模式不同,Direct支持多队列绑定相同Key,兼具灵活性与定向投递优势。
Direct交换机
|
1天前
|
项目管理 开发者
业务架构图
本文系统阐述了业务架构图的核心概念与绘制方法,涵盖业务定义、架构分层(组织层、应用层、能力层、基础层)、模块划分及功能分解,并结合医院场景示例,说明如何通过分层、分模块、分功能构建清晰的业务视图,提升客户理解与开发效率。
|
1天前
|
存储 Dubbo API
SpringCloud工程部署启动
本文介绍SpringCloud微服务工程搭建全过程,涵盖项目创建、数据库配置、服务部署及远程调用实现。通过两种方案导入工程,完成user-service与order-service模块开发,并利用RestTemplate实现跨服务数据调用,帮助理解微服务间通信机制及拆分逻辑。
|
1天前
|
SQL 关系型数据库 数据库
分布式事务
本文介绍了分布式事务的概念、典型场景及解决方案。在微服务架构下,一次业务操作需跨多个数据库和远程调用协作完成,传统本地事务无法保证整体一致性。通过Seata框架可实现分布式事务控制,其AT模式无侵入、高性能,基于两阶段提交与undo log实现最终一致;XA模式则提供强一致性但性能较低。文章还结合下单、支付等场景演示了Seata的集成与应用。
|
1天前
|
人工智能 监控 Java
请求限流
本文介绍如何使用Sentinel实现接口限流与降级,通过配置QPS阈值保护商品查询接口,并结合JMeter进行压测验证。同时讲解了线程隔离机制,包括信号量隔离的应用,确保系统在高并发下的稳定性。
请求限流
|
1天前
|
监控 Java Sentinel
熔断降级
熔断降级是防止服务雪崩的核心机制,通过Sentinel实现。熔断由客户端断路器统计异常或慢请求比例,超阈值后拦截请求;降级则返回默认数据保障体验。结合使用可快速失败、避免级联故障。
|
1天前
|
人工智能 Java 应用服务中间件
微服务保护
本节介绍微服务雪崩问题及保护方案。当某服务故障或负载过高,可能引发级联失败,导致整个系统不可用。为避免此问题,需采取熔断、降级、超时、线程隔离和限流等措施。常用工具包括Hystrix、Resilience4j和Sentinel,课程重点讲解Sentinel的使用。
|
1天前
|
存储 缓存 负载均衡
Nacos注册中心
本文介绍Nacos的安装部署、服务注册与发现、权重控制、集群隔离及临时/持久实例等核心功能,涵盖从环境搭建到高级配置的完整实践,助力微服务架构高效管理。
 Nacos注册中心
|
1天前
|
运维 安全 Devops
生产环境缺陷管理
针对大型团队多分支开发中bug协同管理复杂、易遗漏等问题,我们基于go-git开发了通用型工具git-poison,实现分布式、自动化bug追溯与阻塞发布。通过“投毒-解毒-银针”机制,打通开发、测试、运维流程,降低沟通成本,避免因人为疏漏导致的生产故障,提升发布安全与效率。