RabbitMQ工作模型

简介: 工作队列模型通过多个消费者共同消费一个队列中的消息,实现任务的并行处理。默认情况下,消息平均分配给消费者,可能导致处理能力不同的消费者负载不均。通过设置`prefetch=1`,可实现“能者多劳”,即处理速度快的消费者自动接收更多消息,提升整体效率。发布订阅模型则通过交换机(Exchange)将一条消息转发给多个队列,支持Fanout、Direct、Topic等类型交换机,实现广播或多条件路由消息,满足不同业务场景需求。

3.1. 工作队列模型(WorkQueues)

3.1.1 介绍

在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:

这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。

由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:

  1. 如果有大量的任务就需要多个消费者去共同处理。
  2. 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。

为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息

3.1.2 测试

3.1.2.1 创建队列

接下来我们测试工作队列模型。

首先,我们在控制台创建一个新的队列,命名为work.queue

3.1.2.2 发送消息程序

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/**
 * workQueue
 * 向队列中不停发送消息,模拟消息堆积。
 */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.1.2.3 接收消息程序

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

3.1.2.4 测试

测试流程:

  1. 启动ConsumerApplication。

观察mq控制台点击“work.queue”发现消费者程序和mq建立了两个通道(连接),在监听队列。

  1. 执行publisher服务中刚刚编写的发送方法testWorkQueue。
  2. 观察消费端控制台中消费消息的情况

最终结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

3.1.2.5 能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.1.3 小结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

3.2. 发布订阅模型(Publish/Subscribe)

3.2.1 介绍

工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。

使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。

发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.2.2 交换机类型

交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?

实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headersfanout.

  • Fanout:广播类型,将消息交给所有绑定到交换机的队列。
  • Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
  • Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配类型,基于MQ的消息头匹配,用的较少。

课堂中,我们讲解前面的三种交换机模式。

3.3. Fanout交换机

3.3.1 介绍

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。

3.3.2 测试

3.3.2.1 创建队列

我们的计划是这样的:

  • 创建一个名为 hmall.fanout的交换机,类型是Fanout
  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

在控制台创建队列fanout.queue1:

在创建一个队列fanout.queue2

3.3.2.2 创建交换机

然后再创建一个交换机hmall.fanout:

3.3.2.3 绑定队列到交换机

然后绑定两个队列到交换机:

3.3.2.4 发送消息

下边实现消息发送:

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "hmall.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

发送成功查看mq控制台消息转发到了绑定此交换机的两个队列

3.3.2.5 接收消息

下边实现消息接收:

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况

进入队列界面,点击fanout.queue1:

进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。

同样的方法可以查看fanout.queue2队列的监听情况。

fanout.queue1和fanout.queue2每个队列都有一个监听者。

下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。

消费者1接收到Fanout消息:【hello, everyone!】
消费者2接收到Fanout消息:【hello, everyone!】

3.3.2.6 启动多个消费者实例

下边我们把consumer服务启动两个实例

此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者

此时的结果相当于下图:

此时执行发送消息程序后四个消息者都可以收到消息吗?

通过测试我们发现:

在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。

每个队列默认采用轮询的方式向消费者推送消息。

3.3.3 小结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

3.4.Direct交换机

相关文章
|
1天前
|
uml C语言
系统时序图
时序图是UML中描述对象间消息传递时间顺序的交互图,横轴为对象,纵轴为时间。用于展示交互流程、强调时序关系,直观表达并发过程。主要元素包括角色、对象、生命线、控制焦点和消息等,广泛应用于系统设计与分析。
系统时序图
|
2天前
|
消息中间件 存储 数据挖掘
应用架构图
技术架构是将业务需求转化为技术实现的关键过程,涵盖分层设计、技术选型与系统集成。本文详解单体与分布式架构,包括展现层、业务层、数据层及基础层的构建逻辑,并通过调用关系图明确系统边界与外部依赖,支撑高效稳定的技术体系落地。
应用架构图
Topic交换机(自行测试)
Topic交换机支持通配符匹配RoutingKey,实现灵活路由。BindingKey用`.`分隔,`*`匹配一个词,`#`匹配零个或多个词。相比Direct交换机,Topic更适用于复杂路由场景。
Direct交换机
Direct交换机根据RoutingKey将消息路由到指定队列,实现精准消息分发。与Fanout广播模式不同,Direct支持多队列绑定相同Key,兼具灵活性与定向投递优势。
Direct交换机
|
1天前
|
项目管理 开发者
业务架构图
本文系统阐述了业务架构图的核心概念与绘制方法,涵盖业务定义、架构分层(组织层、应用层、能力层、基础层)、模块划分及功能分解,并结合医院场景示例,说明如何通过分层、分模块、分功能构建清晰的业务视图,提升客户理解与开发效率。
|
1天前
|
存储 Dubbo API
SpringCloud工程部署启动
本文介绍SpringCloud微服务工程搭建全过程,涵盖项目创建、数据库配置、服务部署及远程调用实现。通过两种方案导入工程,完成user-service与order-service模块开发,并利用RestTemplate实现跨服务数据调用,帮助理解微服务间通信机制及拆分逻辑。
|
1天前
|
SQL 关系型数据库 数据库
分布式事务
本文介绍了分布式事务的概念、典型场景及解决方案。在微服务架构下,一次业务操作需跨多个数据库和远程调用协作完成,传统本地事务无法保证整体一致性。通过Seata框架可实现分布式事务控制,其AT模式无侵入、高性能,基于两阶段提交与undo log实现最终一致;XA模式则提供强一致性但性能较低。文章还结合下单、支付等场景演示了Seata的集成与应用。
|
1天前
|
人工智能 监控 Java
请求限流
本文介绍如何使用Sentinel实现接口限流与降级,通过配置QPS阈值保护商品查询接口,并结合JMeter进行压测验证。同时讲解了线程隔离机制,包括信号量隔离的应用,确保系统在高并发下的稳定性。
请求限流
|
1天前
|
监控 Java Sentinel
熔断降级
熔断降级是防止服务雪崩的核心机制,通过Sentinel实现。熔断由客户端断路器统计异常或慢请求比例,超阈值后拦截请求;降级则返回默认数据保障体验。结合使用可快速失败、避免级联故障。
|
1天前
|
人工智能 Java 应用服务中间件
微服务保护
本节介绍微服务雪崩问题及保护方案。当某服务故障或负载过高,可能引发级联失败,导致整个系统不可用。为避免此问题,需采取熔断、降级、超时、线程隔离和限流等措施。常用工具包括Hystrix、Resilience4j和Sentinel,课程重点讲解Sentinel的使用。