异步消息组件MQ基础

简介: 本课程学习RabbitMQ在微服务中的应用,涵盖MQ的应用场景、异步与同步调用区别、RabbitMQ的安装与配置、消息收发入门、工作队列模型及交换机类型(Fanout、Direct、Topic)等核心知识,并结合SpringAMQP实现消息通信与项目实战。

学习目标

  1. 能够说出MQ的应用场景
  2. 能够编写RabbitMQ收发消息的入门程序
  3. 能够说出工作队列模型的特点
  4. 能够说出发布订阅模型的交换机类型
  5. 能够说出fanout交换机的特性
  6. 能够说出Direct交换机的特性
  7. 能够说出Topic交换机的特性
  8. 能够说出惰性队列的特性
  9. 能够说出优先级队列的特性
  10. 能够说出如何解决MQ消息堆积问题
  11. 能够在商城项目应用MQ

1.初识MQ

1.1 同步调用与异步调用

前边我们学习了微服务之间远程调用方式,通常服务端提供HTTP RESTful接口,客户端通过HTTP Client工具进行远程调用,远程调用工具有RestTemplateOpenFeign、OkHttp等技术,这些技术实现的都是一种类型即同步调用,微服务之间通信还有一种异步调用,什么是同步调用?什么是异步调用?

拿订单支付功能举例,下图表示了同步调用

支付的交互流程如下:

  1. 支付服务调用用户服务扣减余额。
  2. 余额扣减成功后支付服务更新数据库中的交易流水状态为已支付。
  3. 更新成功后支付服务调用交易服务更新订单状态为已支付。

像这种每一步调用者必须等待被调用方完全执行完毕并返回结果之后才能继续执行后续代码,叫同步调用

同步调用就是顺序执行,执行完一步再执行下一步。

什么是异步调用?

异步调用是调用者发出调用后无需等待被调用方完成就可以继续执行其他任务。

举例:

支付的交互流程如下:

  1. 支付服务调用用户服务扣减余额。
  2. 余额扣减成功后支付服务更新数据库中的交易流水状态为已支付。
  3. 更新交易流水成功支付服务向消息中间件发消息(XX订单支付成功),此时支付服务程序执行结束
  4. 消息中间件去通知交易服务更新订单状态
  5. 消息中间件去通知短信服务通知用户订单支付成功啦

上边交互流程中前3步为同步调用,后2步为异步调用

支付服务向消息中间件发过消息后不用等继续执行其它操作,这就是异步调用

现在生活中异步调用的例子很多:

餐厅点餐:

想象一下你去一家餐厅吃饭,你坐下后,服务员过来让你点菜。在这个过程中:

  • 异步调用:你告诉服务员你想要什么菜品,然后服务员把订单送到厨房。你不需要等待食物准备完成,可以继续聊天或浏览菜单。当食物准备好时,服务员会将食物端到你的桌上。
  • 同步调用:如果你必须站在厨房门口等着厨师为你准备食物,那么这就是一个同步的过程。你无法做其他事情,直到食物准备完毕。

医院挂号:

  • 异步调用:打电话挂号,接通电话你告诉工作人员挂哪个科室,工作人员让你挂断电话稍后通过手机查看挂号结果。
  • 同步调用:打电话挂号,接通电话你告诉工作人员挂哪个科室,工作人员开始查看该科室是否有号,并进行挂号,挂号结束告诉你几点来看病,最后挂断电话。

同步调用:

特点:

  • 在同步调用中,调用者必须等待被调用方完全执行完毕并返回结果之后才能继续执行后续代码。
  • 控制流是线性的,即程序按照顺序执行每个操作。
  • 如果被调用方执行时间较长,那么整个程序会处于等待状态,直到该调用完成。

适合场景:

  • 实时性要求较高的场景,例如用户界面操作。
  • 调用简单且执行快速的操作。

优点

  • 简单直观:代码易于理解和编写,因为它是按顺序执行的。
  • 易于调试:由于执行顺序明确,调试起来相对容易。

缺点

  • 阻塞执行:如果一个操作需要很长时间来完成,那么整个程序会被阻塞,不能执行其他任务。
  • 资源浪费:在等待长时间操作完成时,CPU和其他资源可能会处于空闲状态。

异步调用:

特点:

  • 在异步调用中,调用者发出调用后无需等待被调用方完成就可以继续执行其他任务。
  • 被调用方完成操作后,通常会通过回调函数、事件通知或者完成信号等方式告知调用者。
  • 可以提高程序的并发能力和响应速度

适合场景:

  • 需要处理耗时操作,例如网络请求、文件I/O等。
  • 多个操作之间不存在严格的依赖关系。
  • 对系统性能和响应时间有较高要求的应用场景。

优点

  • 提高效率:异步调用可以使程序在等待某些耗时操作完成的同时执行其他任务,提高整体执行效率。
  • 资源利用率高:在等待耗时操作时,可以释放资源给其他任务使用,避免了资源浪费。
  • 更好的用户体验:在网络应用中,用户不必等待页面加载完成就能进行其他操作,提高了用户体验。

缺点

  • 复杂性增加:异步编程通常比同步编程更复杂,因为它涉及更多的控制结构和错误处理逻辑。
  • 调试困难:由于执行路径不是线性的,调试起来相对困难。

1.2 初识MQ

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker(消息代理/消息中间件):管理、存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接收者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

AI:消息队列的应用场景

  1. 异步处理:
  1. 场景: 当应用程序需要执行耗时的操作(如发送电子邮件、文件上传或下载等)时,可以将这些任务发送到消息队列中,由专门的任务处理程序异步执行。
  2. 好处: 减少了用户的等待时间,提高了用户体验。
  1. 解耦:
  1. 场景: 当一个系统由多个组件组成时,这些组件之间可以使用消息队列进行通信。
  2. 好处: 单个组件的变化不会直接影响到其他组件,提高了系统的可扩展性。
  1. 流量削峰
  1. 场景:在许多互联网应用和服务中,尤其是那些具有明显周期性流量特征的应用(如电商平台、社交网络等),常常会遇到流量突增的情况。例如,在电商促销期间,大量的用户会在短时间内访问网站并提交订单,这种短时间内产生的巨大流量可能会导致服务器过载,影响用户体验甚至导致服务不可用。
  2. 好处:当流量激增时接收到请求会被暂时存储在消息队列中,而不是直接发送到后端服务进行处理。这样做可以避免后端服务因为短时间内处理大量请求而过载。后端服务可以从消息队列中按需拉取消息进行处理。这种异步处理机制可以有效地分散流量峰值,确保后端服务的稳定运行。如下图:

目比较常见的MQ实现:

几种常见MQ的对比:

  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐(高并发)能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

这四种的MQ在市场都是非常流行,本课程讲解RabbitMQ。

Spring Boot默认支持AMQP协议,RabbitMQ支持AMQP协议 。

1.3 面试题

MQ有什么应用场景?

2 RabbitMQ入门

2.1 RabbitMQ介绍

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

RabbitMQ的架构如图:

其中包含几个概念:

  • publisher:生产者,也就是发送消息的应用程序
  • consumer:消费者,也就是消费消息的应用程序
  • queue:队列,存储消息的缓冲区。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理,下一节我们就一起来学习控制台的使用。

2.2. 安装

已帮大家装好mq,但是这里面有很多冗余的数据,你可以直接启动

  • docker ps -a:查看所有,可以看到有单机mq,和集群的mq1~3
  • docker start mq启动即可

或先删除【建议】

  • docker rm -f mq
  • 然后执行下面的启动脚本即可

或启动时候,命名一个新的

docker run \

-e RABBITMQ_DEFAULT_USER=itheima \

-e RABBITMQ_DEFAULT_PASS=123321 \

-v mq-plugins:/plugins \

--name mq197 \

-p 15672:15672 \

-p 5672:5672 \

-d \

rabbitmq:3.8-management

我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:

找到课前资料下的mq.tar(rabbitmq的镜像文件),上传到/root下。

利用docker load命令加载:docker load -i mq.tar

执行下边的脚本创建容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.101.68:15672 即可看到管理控制台。首次访问需要登录,默认的用户名:itheima,密码:123321。登录后即可看到管理控制台总览页面:

2.3. 收发消息

2.3.1 搭建环境

RabbitMQ安装成功后下边我们编写消息发送与消息接收程序实现收发消息,如下图:publisher即消息发送者将消息发送到MQ的队列中,consumer即消息消费者从MQ中接收消息。

RabbitMQ通信采用了AMQP (Advanced Message Queuing Protocol) 协议,因此它具备跨语言的特性,任何语言只要遵循AMQP协议都可以使用RabbitMQ收发消息。

RabbitMQ官方也提供了各种不同语言的客户端API,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

下边使用SpringAMQP实现消息收发,上图是RabbitMQ最简单的工作模型,我们仅作测试使用,这种模式一般很少在生产中使用。

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

将其复制到你的工作空间,然后用Idea打开,项目结构如图:

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.itcast.demo</groupId>
  <artifactId>mq-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <modules>
    <module>publisher</module>
    <module>consumer</module>
  </modules>
  <packaging>pom</packaging>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.12</version>
    <relativePath/>
  </parent>
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
  </dependencies>
</project>

设置java版本

因此,子工程中就可以直接使用SpringAMQP了。

2.3.2 创建队列

首先进入RabbitMQ控制台创建队列,新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。

2.3.3 消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在publisher服务的test下编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("消息发送成功:{}",message);
    }
}

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

我们可以在RabbitMQ的控制台去查看消息

2.3.4 消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.3.5 测试

测试流程:

  • 启动consumer服务
  • 在publisher服务中运行测试代码,发送MQ消息。
  • 观察consumer控制台日志收到消息:

2.3.6 推模式与拉模式

在RabbitMQ中,消息传递给消费者的方式有两种:推模式(Push)和拉模式(Pull)。

  1. 推模式(Push):这是最常用的模式,在这种模式下,一旦消费者订阅了一个队列,RabbitMQ就会自动将队列中的消息发送给消费者,这种方式不需要消费者持续地询问是否有新的消息,而是由Broker在有消息时主动发送给消费者。
  2. 拉模式(Pull):在这种模式下,消费者需要主动请求Broker来获取消息。

在实践中,推模式更常见,因为它可以减少消费者的网络负载,并且可以让Broker更好地控制消息的传递速率。然而,拉模式也有其应用场景,比如当消费者想要精确控制消息获取的时候。

RabbitMQ默认采用的是推模式,但同时也提供了拉模式的支持,以满足不同的应用场景需求。

2.4. 数据隔离

2.4.1 用户管理

RabbitMQ支持多租户,多个系统可以同时使用一个RabbitMQ。

什么是多租户一种软件架构设计模式,允许多个租户(用户、组织或客户)共享同一套应用程序或系统实例,同时确保每个租户的数据和配置是隔离的。比如一张表增加一个companyId,那么就是不同的公司共用一套代码+数据库,但是又做了数据隔离

公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用虚拟主机进行隔离,将不同项目隶属不同的虚拟主机。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的虚拟主机,将每个项目的数据隔离。

下边学习用户管理和虚拟主机的配置。

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

下边我们给黑马商城创建一个新的用户,命名为hmall,密码123,注意选择adminstrator

你会发现此时hmall用户没有任何virtual host的访问权限:

接下来我们来给hmall用户授权。

2.4.2 虚拟主机

下边配置虚拟主机。

我们先退出登录:

切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页:

可以看到目前只有一个默认的virtual host,名字为 /

我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/

创建完成后如图:

由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了:

此时,点击页面右上角的virtual host下拉菜单,切换virtual host/hmall

然后再次查看queues选项卡,会发现之前的队列已经看不到了:

这就是基于virtual host 的隔离效果。

2.4.3 测试

下边我们用hmall用户及新创建虚拟主机。

修改publisher及consumer的application.yml

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

重新进行收发消息的测试,发现消息发发放到了virtual-host: /hmall 下的队列,如下图:

再启动consumer服务接收消息。

3 RabbitMQ工作模型

3.1. 工作队列模型(WorkQueues)

3.1.1 介绍

在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:

这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。

由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:

  1. 如果有大量的任务就需要多个消费者去共同处理。
  2. 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。

为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息

3.1.2 测试

3.1.2.1 创建队列

接下来我们测试工作队列模型。

首先,我们在控制台创建一个新的队列,命名为work.queue

3.1.2.2 发送消息程序

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/**
 * workQueue
 * 向队列中不停发送消息,模拟消息堆积。
 */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.1.2.3 接收消息程序

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息
相关文章
|
12天前
|
数据采集 人工智能 安全
|
7天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
344 164
|
6天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
345 155
|
7天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
580 4
|
15天前
|
SQL 自然语言处理 调度
Agent Skills 的一次工程实践
**本文采用 Agent Skills 实现整体智能体**,开发框架采用 AgentScope,模型使用 **qwen3-max**。Agent Skills 是 Anthropic 新推出的一种有别于mcp server的一种开发方式,用于为 AI **引入可共享的专业技能**。经验封装到**可发现、可复用的能力单元**中,每个技能以文件夹形式存在,包含特定任务的指导性说明(SKILL.md 文件)、脚本代码和资源等 。大模型可以根据需要动态加载这些技能,从而扩展自身的功能。目前不少国内外的一些框架也开始支持此种的开发方式,详细介绍如下。
1018 7