RabbitMQ工作模型

简介: 工作队列模型通过多个消费者共同消费一个队列中的消息,实现任务的并行处理。默认情况下,消息平均分配给消费者,可能导致处理能力不同的消费者负载不均。通过设置`prefetch=1`,可实现“能者多劳”,即处理速度快的消费者自动接收更多消息,提升整体效率。发布订阅模型则通过交换机(Exchange)将一条消息转发给多个队列,支持Fanout、Direct、Topic等类型交换机,实现广播或多条件路由消息,满足不同业务场景需求。

3.1. 工作队列模型(WorkQueues)

3.1.1 介绍

在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:

这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。

由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:

  1. 如果有大量的任务就需要多个消费者去共同处理。
  2. 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。

为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息

3.1.2 测试

3.1.2.1 创建队列

接下来我们测试工作队列模型。

首先,我们在控制台创建一个新的队列,命名为work.queue

3.1.2.2 发送消息程序

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/**
 * workQueue
 * 向队列中不停发送消息,模拟消息堆积。
 */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.1.2.3 接收消息程序

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

3.1.2.4 测试

测试流程:

  1. 启动ConsumerApplication。

观察mq控制台点击“work.queue”发现消费者程序和mq建立了两个通道(连接),在监听队列。

  1. 执行publisher服务中刚刚编写的发送方法testWorkQueue。
  2. 观察消费端控制台中消费消息的情况

最终结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

3.1.2.5 能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.1.3 小结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

3.2. 发布订阅模型(Publish/Subscribe)

3.2.1 介绍

工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。

使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。

发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.2.2 交换机类型

交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?

实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headersfanout.

  • Fanout:广播类型,将消息交给所有绑定到交换机的队列。
  • Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
  • Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配类型,基于MQ的消息头匹配,用的较少。

课堂中,我们讲解前面的三种交换机模式。

3.3. Fanout交换机

3.3.1 介绍

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。

3.3.2 测试

3.3.2.1 创建队列

我们的计划是这样的:

  • 创建一个名为 hmall.fanout的交换机,类型是Fanout
  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

在控制台创建队列fanout.queue1:

在创建一个队列fanout.queue2

3.3.2.2 创建交换机

然后再创建一个交换机hmall.fanout:

3.3.2.3 绑定队列到交换机

然后绑定两个队列到交换机:

3.3.2.4 发送消息

下边实现消息发送:

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "hmall.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

发送成功查看mq控制台消息转发到了绑定此交换机的两个队列

3.3.2.5 接收消息

下边实现消息接收:

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况

进入队列界面,点击fanout.queue1:

进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。

同样的方法可以查看fanout.queue2队列的监听情况。

fanout.queue1和fanout.queue2每个队列都有一个监听者。

下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。

消费者1接收到Fanout消息:【hello, everyone!】
消费者2接收到Fanout消息:【hello, everyone!】

3.3.2.6 启动多个消费者实例

下边我们把consumer服务启动两个实例

此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者

此时的结果相当于下图:

此时执行发送消息程序后四个消息者都可以收到消息吗?

通过测试我们发现:

在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。

每个队列默认采用轮询的方式向消费者推送消息。

3.3.3 小结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

3.4.Direct交换机

相关文章
|
3月前
|
人工智能 监控 Java
请求限流
本文介绍如何使用Sentinel实现接口限流与降级,通过配置QPS阈值保护商品查询接口,并结合JMeter进行压测验证。同时讲解了线程隔离机制,包括信号量隔离的应用,确保系统在高并发下的稳定性。
请求限流
|
监控 项目管理
PMBOK泛读(第四章) - 项目整合管理(二)
PMBOK泛读(第四章) - 项目整合管理(二)
300 0
|
8月前
|
存储 人工智能 自然语言处理
DeepSeek R1+Open WebUI实现本地知识库的搭建和局域网访问
本文介绍了使用 DeepSeek R1 和 Open WebUI 搭建本地知识库的详细步骤与注意事项,涵盖核心组件介绍、硬件与软件准备、模型部署、知识库构建及问答功能实现等内容,适用于本地文档存储、向量化与检索增强生成(RAG)场景的应用开发。
3004 0
|
调度 C# Windows
震惊!Windows Service服务和定时任务框架quartz之间原来是这种关系……(下)
震惊!Windows Service服务和定时任务框架quartz之间原来是这种关系……(下)
|
2月前
|
运维 安全 Linux
【运维实战】企业级 NFS 文件共享服务 · 一键自动化部署方案 (龙蜥/银河麒麟 V10 /openEuler /CentOS)
面向国产化替代需求,阜阳云动科技推出企业级NFS一键部署脚本,支持银河麒麟V10、openEuler、CentOS、龙蜥等主流系统。实现多部门隔离、权限可控、跨平台兼容的文件共享服务,自动化创建用户、目录与客户端挂载脚本,显著提升部署效率与安全性。开箱即用,助力政企高效构建安全稳定的NFS共享环境。
436 1
【运维实战】企业级 NFS 文件共享服务 · 一键自动化部署方案 (龙蜥/银河麒麟 V10 /openEuler /CentOS)
|
2月前
|
安全 测试技术 网络安全
2026年移动应用渗透测试流程方案及iOS与Android框架对比
文章聚焦2026年移动应用渗透测试,介绍其涵盖信息收集等关键环节且将更重自动化与AI辅助,对比iOS与Android因系统差异在框架上的显著区别。还阐述主流测试方案及优劣势,给出企业实施的最佳实践与落地路径,解答常见问题,助力企业应对安全挑战。
|
3月前
|
Java Shell 测试技术
Jmeter快速入门
本文介绍JMeter的安装与快速入门。需先安装JDK并配置环境变量,再下载解压JMeter,通过bin目录启动。首次运行可设中文界面,建议修改配置文件实现永久设置。随后演示创建线程组、添加HTTP取样器及监听器,完成简单性能测试流程。
 Jmeter快速入门
|
3月前
|
自然语言处理 安全 Java
Spring Boot中集成Lucence
本文介绍了Lucene全文检索原理及其在Spring Boot中的集成应用。首先解析了全文检索的核心思想——通过建立索引提升搜索效率,然后详细演示了Lucene的分词、索引构建与查询过程,并结合中文分词、高亮显示等实战功能,展示了其在实际项目中的灵活运用。
|
3月前
|
存储 数据库
数据库设计三范式
数据库三范式是设计合理表结构的指导原则:第一范式要求字段原子性、不可再分;第二范式要求消除部分依赖,即主键确定所有非主键;第三范式要求消除传递依赖。但实际应用中应结合项目需求灵活调整,避免过度规范化带来复杂性。
|
3月前
|
人工智能 监控 安全
别等上线才后悔!AI应用测试的5个维度与4类实战避坑指南
某心理App上线AI打卡功能后因“张冠李戴”遭投诉,暴露大模型测试新挑战。传统测试难应对AI的不确定性,需构建覆盖准确性、鲁棒性、安全性、性能与合规性的五维框架,从“流程检验”转向“智能体守护”,确保AI输出精准、安全、有温度。