异步消息组件MQ基础

简介: 本文介绍了MQ(消息队列)的基本概念,重点对比了同步调用与异步调用的区别,通过生活实例帮助理解。异步调用通过消息中间件实现解耦、异步处理和流量削峰,提升系统性能。常见的MQ如RabbitMQ、Kafka等适用于高并发场景。RabbitMQ基于AMQP协议,支持多语言,结合SpringAMQP可轻松实现消息收发。文章还演示了RabbitMQ的安装、配置、数据隔离及工作队列模型,强调“能者多劳”机制以优化消费效率。

1.初识MQ

1.1 同步调用与异步调用

前边我们学习了微服务之间远程调用方式,通常服务端提供HTTP RESTful接口,客户端通过HTTP Client工具进行远程调用,远程调用工具有RestTemplateOpenFeign、OkHttp等技术,这些技术实现的都是一种类型即同步调用,微服务之间通信还有一种异步调用,什么是同步调用?什么是异步调用?

拿订单支付功能举例,下图表示了同步调用

支付的交互流程如下:

  1. 支付服务调用用户服务扣减余额。
  2. 余额扣减成功后支付服务更新数据库中的交易流水状态为已支付。
  3. 更新成功后支付服务调用交易服务更新订单状态为已支付。

像这种每一步调用者必须等待被调用方完全执行完毕并返回结果之后才能继续执行后续代码,叫同步调用

同步调用就是顺序执行,执行完一步再执行下一步。

什么是异步调用?

异步调用是调用者发出调用后无需等待被调用方完成就可以继续执行其他任务。

举例:

支付的交互流程如下:

  1. 支付服务调用用户服务扣减余额。
  2. 余额扣减成功后支付服务更新数据库中的交易流水状态为已支付。
  3. 更新交易流水成功支付服务向消息中间件发消息(XX订单支付成功),此时支付服务程序执行结束
  4. 消息中间件去通知交易服务更新订单状态
  5. 消息中间件去通知短信服务通知用户订单支付成功啦

上边交互流程中前3步为同步调用,后2步为异步调用

支付服务向消息中间件发过消息后不用等继续执行其它操作,这就是异步调用

现在生活中异步调用的例子很多:

餐厅点餐:

想象一下你去一家餐厅吃饭,你坐下后,服务员过来让你点菜。在这个过程中:

  • 异步调用:你告诉服务员你想要什么菜品,然后服务员把订单送到厨房。你不需要等待食物准备完成,可以继续聊天或浏览菜单。当食物准备好时,服务员会将食物端到你的桌上。
  • 同步调用:如果你必须站在厨房门口等着厨师为你准备食物,那么这就是一个同步的过程。你无法做其他事情,直到食物准备完毕。

医院挂号:

  • 异步调用:打电话挂号,接通电话你告诉工作人员挂哪个科室,工作人员让你挂断电话稍后通过手机查看挂号结果。
  • 同步调用:打电话挂号,接通电话你告诉工作人员挂哪个科室,工作人员开始查看该科室是否有号,并进行挂号,挂号结束告诉你几点来看病,最后挂断电话。

同步调用:

特点:

  • 在同步调用中,调用者必须等待被调用方完全执行完毕并返回结果之后才能继续执行后续代码。
  • 控制流是线性的,即程序按照顺序执行每个操作。
  • 如果被调用方执行时间较长,那么整个程序会处于等待状态,直到该调用完成。

适合场景:

  • 实时性要求较高的场景,例如用户界面操作。
  • 调用简单且执行快速的操作。

优点

  • 简单直观:代码易于理解和编写,因为它是按顺序执行的。
  • 易于调试:由于执行顺序明确,调试起来相对容易。

缺点

  • 阻塞执行:如果一个操作需要很长时间来完成,那么整个程序会被阻塞,不能执行其他任务。
  • 资源浪费:在等待长时间操作完成时,CPU和其他资源可能会处于空闲状态。

异步调用:

特点:

  • 在异步调用中,调用者发出调用后无需等待被调用方完成就可以继续执行其他任务。
  • 被调用方完成操作后,通常会通过回调函数、事件通知或者完成信号等方式告知调用者。
  • 可以提高程序的并发能力和响应速度

适合场景:

  • 需要处理耗时操作,例如网络请求、文件I/O等。
  • 多个操作之间不存在严格的依赖关系。
  • 对系统性能和响应时间有较高要求的应用场景。

优点

  • 提高效率:异步调用可以使程序在等待某些耗时操作完成的同时执行其他任务,提高整体执行效率。
  • 资源利用率高:在等待耗时操作时,可以释放资源给其他任务使用,避免了资源浪费。
  • 更好的用户体验:在网络应用中,用户不必等待页面加载完成就能进行其他操作,提高了用户体验。

缺点

  • 复杂性增加:异步编程通常比同步编程更复杂,因为它涉及更多的控制结构和错误处理逻辑。
  • 调试困难:由于执行路径不是线性的,调试起来相对困难。

1.2 初识MQ

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker(消息代理/消息中间件):管理、存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接收者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

AI:消息队列的应用场景

  1. 异步处理:
  1. 场景: 当应用程序需要执行耗时的操作(如发送电子邮件、文件上传或下载等)时,可以将这些任务发送到消息队列中,由专门的任务处理程序异步执行。
  2. 好处: 减少了用户的等待时间,提高了用户体验。
  1. 解耦:
  1. 场景: 当一个系统由多个组件组成时,这些组件之间可以使用消息队列进行通信。
  2. 好处: 单个组件的变化不会直接影响到其他组件,提高了系统的可扩展性。
  1. 流量削峰
  1. 场景:在许多互联网应用和服务中,尤其是那些具有明显周期性流量特征的应用(如电商平台、社交网络等),常常会遇到流量突增的情况。例如,在电商促销期间,大量的用户会在短时间内访问网站并提交订单,这种短时间内产生的巨大流量可能会导致服务器过载,影响用户体验甚至导致服务不可用。
  2. 好处:当流量激增时接收到请求会被暂时存储在消息队列中,而不是直接发送到后端服务进行处理。这样做可以避免后端服务因为短时间内处理大量请求而过载。后端服务可以从消息队列中按需拉取消息进行处理。这种异步处理机制可以有效地分散流量峰值,确保后端服务的稳定运行。如下图:

目比较常见的MQ实现:

几种常见MQ的对比:

  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐(高并发)能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

这四种的MQ在市场都是非常流行,本课程讲解RabbitMQ。

Spring Boot默认支持AMQP协议,RabbitMQ支持AMQP协议 。

1.3 面试题

MQ有什么应用场景?

2 RabbitMQ入门

2.1 RabbitMQ介绍

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

RabbitMQ的架构如图:

其中包含几个概念:

  • publisher:生产者,也就是发送消息的应用程序
  • consumer:消费者,也就是消费消息的应用程序
  • queue:队列,存储消息的缓冲区。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理,下一节我们就一起来学习控制台的使用。

2.2. 安装

已帮大家装好mq,但是这里面有很多冗余的数据,你可以直接启动

  • docker ps -a:查看所有,可以看到有单机mq,和集群的mq1~3
  • docker start mq启动即可

或先删除【建议】

  • docker rm -f mq
  • 然后执行下面的启动脚本即可

或启动时候,命名一个新的

docker run \

-e RABBITMQ_DEFAULT_USER=itheima \

-e RABBITMQ_DEFAULT_PASS=123321 \

-v mq-plugins:/plugins \

--name mq197 \

-p 15672:15672 \

-p 5672:5672 \

-d \

rabbitmq:3.8-management

我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:

找到课前资料下的mq.tar(rabbitmq的镜像文件),上传到/root下。

利用docker load命令加载:docker load -i mq.tar

执行下边的脚本创建容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.101.68:15672 即可看到管理控制台。首次访问需要登录,默认的用户名:itheima,密码:123321。登录后即可看到管理控制台总览页面:

2.3. 收发消息

2.3.1 搭建环境

RabbitMQ安装成功后下边我们编写消息发送与消息接收程序实现收发消息,如下图:publisher即消息发送者将消息发送到MQ的队列中,consumer即消息消费者从MQ中接收消息。

RabbitMQ通信采用了AMQP (Advanced Message Queuing Protocol) 协议,因此它具备跨语言的特性,任何语言只要遵循AMQP协议都可以使用RabbitMQ收发消息。

RabbitMQ官方也提供了各种不同语言的客户端API,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

下边使用SpringAMQP实现消息收发,上图是RabbitMQ最简单的工作模型,我们仅作测试使用,这种模式一般很少在生产中使用。

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

将其复制到你的工作空间,然后用Idea打开,项目结构如图:

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.itcast.demo</groupId>
  <artifactId>mq-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <modules>
    <module>publisher</module>
    <module>consumer</module>
  </modules>
  <packaging>pom</packaging>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.12</version>
    <relativePath/>
  </parent>
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
  </dependencies>
</project>

设置java版本

因此,子工程中就可以直接使用SpringAMQP了。

2.3.2 创建队列

首先进入RabbitMQ控制台创建队列,新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。

2.3.3 消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在publisher服务的test下编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("消息发送成功:{}",message);
    }
}

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

我们可以在RabbitMQ的控制台去查看消息

2.3.4 消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.3.5 测试

测试流程:

  • 启动consumer服务
  • 在publisher服务中运行测试代码,发送MQ消息。
  • 观察consumer控制台日志收到消息:

2.3.6 推模式与拉模式

在RabbitMQ中,消息传递给消费者的方式有两种:推模式(Push)和拉模式(Pull)。

  1. 推模式(Push):这是最常用的模式,在这种模式下,一旦消费者订阅了一个队列,RabbitMQ就会自动将队列中的消息发送给消费者,这种方式不需要消费者持续地询问是否有新的消息,而是由Broker在有消息时主动发送给消费者。
  2. 拉模式(Pull):在这种模式下,消费者需要主动请求Broker来获取消息。

在实践中,推模式更常见,因为它可以减少消费者的网络负载,并且可以让Broker更好地控制消息的传递速率。然而,拉模式也有其应用场景,比如当消费者想要精确控制消息获取的时候。

RabbitMQ默认采用的是推模式,但同时也提供了拉模式的支持,以满足不同的应用场景需求。

2.4. 数据隔离

2.4.1 用户管理

RabbitMQ支持多租户,多个系统可以同时使用一个RabbitMQ。

什么是多租户一种软件架构设计模式,允许多个租户(用户、组织或客户)共享同一套应用程序或系统实例,同时确保每个租户的数据和配置是隔离的。比如一张表增加一个companyId,那么就是不同的公司共用一套代码+数据库,但是又做了数据隔离

公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用虚拟主机进行隔离,将不同项目隶属不同的虚拟主机。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的虚拟主机,将每个项目的数据隔离。

下边学习用户管理和虚拟主机的配置。

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名
  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限
  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

下边我们给黑马商城创建一个新的用户,命名为hmall,密码123,注意选择adminstrator

你会发现此时hmall用户没有任何virtual host的访问权限:

接下来我们来给hmall用户授权。

2.4.2 虚拟主机

下边配置虚拟主机。

我们先退出登录:

切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页:

可以看到目前只有一个默认的virtual host,名字为 /

我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/

创建完成后如图:

由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了:

此时,点击页面右上角的virtual host下拉菜单,切换virtual host/hmall

然后再次查看queues选项卡,会发现之前的队列已经看不到了:

这就是基于virtual host 的隔离效果。

2.4.3 测试

下边我们用hmall用户及新创建虚拟主机。

修改publisher及consumer的application.yml

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

重新进行收发消息的测试,发现消息发发放到了virtual-host: /hmall 下的队列,如下图:

再启动consumer服务接收消息。

3 RabbitMQ工作模型

3.1. 工作队列模型(WorkQueues)

3.1.1 介绍

在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:

这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。

由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:

  1. 如果有大量的任务就需要多个消费者去共同处理。
  2. 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。

为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息

3.1.2 测试

3.1.2.1 创建队列

接下来我们测试工作队列模型。

首先,我们在控制台创建一个新的队列,命名为work.queue

3.1.2.2 发送消息程序

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/**
 * workQueue
 * 向队列中不停发送消息,模拟消息堆积。
 */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.1.2.3 接收消息程序

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

3.1.2.4 测试

测试流程:

  1. 启动ConsumerApplication。

观察mq控制台点击“work.queue”发现消费者程序和mq建立了两个通道(连接),在监听队列。

  1. 执行publisher服务中刚刚编写的发送方法testWorkQueue。
  2. 观察消费端控制台中消费消息的情况

最终结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

3.1.2.5 能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.1.3 小结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

3.2. 发布订阅模型(Publish/Subscribe)

3.2.1 介绍

工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。

相关文章
|
3天前
|
存储 负载均衡 算法
负载均衡算法
本文介绍多种负载均衡算法:随机、轮询、最小活跃数、源地址哈希及一致性哈希,涵盖适用场景与实现原理,结合代码与图示解析其调度机制,适用于分布式系统流量管理。
|
3天前
|
存储 Java 关系型数据库
微服务概述
本文介绍单体架构与微服务架构的区别,阐述微服务的定义、核心特征及优缺点,涵盖技术选型、部署方案与常见问题,帮助读者理解微服务演进逻辑,为后续实践打下理论基础。(238字)
|
3天前
|
XML JSON Java
什么是RESTful
RESTful是一种基于资源的API设计规范,强调URI代表资源、使用HTTP动词进行操作,实现统一标准、结构清晰、易于维护的接口风格,解决传统接口行为不规范问题。
|
3天前
|
存储 数据库
数据库设计三范式
本文讲解数据库三范式设计原则:第一范式要求字段原子性,不可再分;第二范式要求消除部分依赖,一张表只描述一件事;第三范式要求消除传递依赖。通过实例分析,说明范式旨在减少数据冗余、提升维护效率,但实际设计应结合业务需求灵活应用,而非盲目遵循。
|
3天前
|
负载均衡 Java 应用服务中间件
微服务网关与配置中心
本文介绍了微服务架构下的网关路由与鉴权机制,重点讲解使用Spring Cloud Gateway实现请求路由、负载均衡及JWT身份校验。通过Nacos实现服务发现,网关统一处理前端请求,解决多入口问题,并在全局过滤器中实现用户鉴权,保障系统安全。
|
3天前
|
关系型数据库 应用服务中间件 nginx
容器化部署引擎Docker
Docker是一种容器化技术,通过镜像打包应用及依赖,实现跨环境快速部署。它利用容器隔离运行应用程序,解决依赖冲突与环境差异问题,相比虚拟机更轻量、高效。
|
3天前
|
Java 数据库 微服务
微服务服务注册与发现
本文介绍了微服务架构的演进与实践。针对单体架构在团队协作、发布效率、扩展性等方面的局限,微服务通过将系统拆分为多个独立部署、单一职责的小型服务,实现高内聚、低耦合,提升系统的可维护性与伸缩能力。结合Spring Cloud与Spring Cloud Alibaba技术栈,文章以黑马商城项目为例,演示了如何创建微服务工程、进行服务拆分,并使用RestTemplate实现服务间远程调用,帮助开发者掌握微服务核心开发技能。
微服务服务注册与发现
|
3天前
|
负载均衡 Java 数据安全/隐私保护
Gateway服务网关
网关是微服务架构的统一入口,核心功能包括请求路由、权限控制、限流及负载均衡。通过Spring Cloud Gateway可实现高效路由转发与过滤器处理,支持跨域配置,提升系统安全与性能。
Gateway服务网关
|
3天前
|
JSON Java API
Feign远程调用
本文介绍了如何使用Feign替代RestTemplate实现微服务间的HTTP调用,涵盖依赖引入、注解配置、自定义日志、连接池优化及代码抽取等实践。通过Feign可简化远程调用,提升开发效率,并结合最佳实践实现代码复用与解耦。
|
3天前
|
Shell 测试技术 Apache
Jmeter快速入门
本文介绍了Apache JMeter的下载、解压与运行方法,并指导用户进行中文语言设置及基本使用。通过添加线程组、HTTP取样器和监听器,快速完成性能测试配置,适合初学者入门学习。