3 RabbitMQ工作模型

简介: 工作队列模型允许多个消费者绑定同一队列,实现消息的并发处理。通过设置`prefetch=1`,可实现“能者多劳”,即处理能力强的消费者自动接收更多消息,避免消息积压,提升整体处理效率。

3.1. 工作队列模型(WorkQueues)
3.1.1 介绍
在入门程序中实现的是一种最简单的工作队列模型,消费者直接绑定到队列上,如下图:

这种方式实现最基本的异步通信,一个生产者,一个队列,一个消费者,生产者将消息发到队列,消费者从队列接收消息。
由于绑定队列的消费者只有一个所以处理消息的能力就比较弱,下边情况将不适合这种方式:

  1. 如果有大量的任务就需要多个消费者去共同处理。
  2. 当生产者发送消息的速度远远大于消费者处理任务的速度此时由于消费者只有一个将造成消息堆积。
    为了解决上边的问题可以使用到下边的方式,让多个消费者绑定到一个队列,共同消费队列中的消息。

3.1.2 测试
3.1.2.1 创建队列
接下来我们测试工作队列模型。
首先,我们在控制台创建一个新的队列,命名为work.queue:

3.1.2.2 发送消息程序
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/**

  • workQueue
  • 向队列中不停发送消息,模拟消息堆积。
    */
    @Test
    public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "work.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
     // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
     rabbitTemplate.convertAndSend(queueName, message + i);
     Thread.sleep(20);
    
    }
    }
    3.1.2.3 接收消息程序
    要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
    }

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:
● 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
● 消费者2 sleep了200毫秒,相当于每秒处理5个消息
3.1.2.4 测试
测试流程:

  1. 启动ConsumerApplication。
    观察mq控制台点击“work.queue”发现消费者程序和mq建立了两个通道(连接),在监听队列。

  2. 执行publisher服务中刚刚编写的发送方法testWorkQueue。

  3. 观察消费端控制台中消费消息的情况
    最终结果如下:
    消费者1接收到消息:【hello, message_0】21:06:00.869555300
    消费者2........接收到消息:【hello, message_1】21:06:00.884518
    消费者1接收到消息:【hello, message_2】21:06:00.907454400
    消费者1接收到消息:【hello, message_4】21:06:00.953332100
    消费者1接收到消息:【hello, message_6】21:06:00.997867300
    消费者1接收到消息:【hello, message_8】21:06:01.042178700
    消费者2........接收到消息:【hello, message_3】21:06:01.086478800
    消费者1接收到消息:【hello, message_10】21:06:01.087476600
    消费者1接收到消息:【hello, message_12】21:06:01.132578300
    消费者1接收到消息:【hello, message_14】21:06:01.175851200
    消费者1接收到消息:【hello, message_16】21:06:01.218533400
    消费者1接收到消息:【hello, message_18】21:06:01.261322900
    消费者2........接收到消息:【hello, message_5】21:06:01.287003700
    消费者1接收到消息:【hello, message_20】21:06:01.304412400
    消费者1接收到消息:【hello, message_22】21:06:01.349950100
    消费者1接收到消息:【hello, message_24】21:06:01.394533900
    消费者1接收到消息:【hello, message_26】21:06:01.439876500
    消费者1接收到消息:【hello, message_28】21:06:01.482937800
    消费者2........接收到消息:【hello, message_7】21:06:01.488977100
    消费者1接收到消息:【hello, message_30】21:06:01.526409300
    消费者1接收到消息:【hello, message_32】21:06:01.572148
    消费者1接收到消息:【hello, message_34】21:06:01.618264800
    消费者1接收到消息:【hello, message_36】21:06:01.660780600
    消费者2........接收到消息:【hello, message_9】21:06:01.689189300
    消费者1接收到消息:【hello, message_38】21:06:01.705261
    消费者1接收到消息:【hello, message_40】21:06:01.746927300
    消费者1接收到消息:【hello, message_42】21:06:01.789835
    消费者1接收到消息:【hello, message_44】21:06:01.834393100
    消费者1接收到消息:【hello, message_46】21:06:01.875312100
    消费者2........接收到消息:【hello, message_11】21:06:01.889969500
    消费者1接收到消息:【hello, message_48】21:06:01.920702500
    消费者2........接收到消息:【hello, message_13】21:06:02.090725900
    消费者2........接收到消息:【hello, message_15】21:06:02.293060600
    消费者2........接收到消息:【hello, message_17】21:06:02.493748
    消费者2........接收到消息:【hello, message_19】21:06:02.696635100
    消费者2........接收到消息:【hello, message_21】21:06:02.896809700
    消费者2........接收到消息:【hello, message_23】21:06:03.099533400
    消费者2........接收到消息:【hello, message_25】21:06:03.301446400
    消费者2........接收到消息:【hello, message_27】21:06:03.504999100
    消费者2........接收到消息:【hello, message_29】21:06:03.705702500
    消费者2........接收到消息:【hello, message_31】21:06:03.906601200
    消费者2........接收到消息:【hello, message_33】21:06:04.108118500
    消费者2........接收到消息:【hello, message_35】21:06:04.308945400
    消费者2........接收到消息:【hello, message_37】21:06:04.511547700
    消费者2........接收到消息:【hello, message_39】21:06:04.714038400
    消费者2........接收到消息:【hello, message_41】21:06:04.916192700
    消费者2........接收到消息:【hello, message_43】21:06:05.116286400
    消费者2........接收到消息:【hello, message_45】21:06:05.318055100
    消费者2........接收到消息:【hello, message_47】21:06:05.520656400
    消费者2........接收到消息:【hello, message_49】21:06:05.723106700
    可以看到消费者1和消费者2竟然每人消费了25条消息:
    ● 消费者1很快完成了自己的25条消息
    ● 消费者2却在缓慢的处理自己的25条消息。
    也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
    3.1.2.5 能者多劳
    在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
    spring:
    rabbitmq:
    listener:
    simple:
     prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    
    再次测试,发现结果如下:
    消费者1接收到消息:【hello, message_0】21:12:51.659664200
    消费者2........接收到消息:【hello, message_1】21:12:51.680610
    消费者1接收到消息:【hello, message_2】21:12:51.703625
    消费者1接收到消息:【hello, message_3】21:12:51.724330100
    消费者1接收到消息:【hello, message_4】21:12:51.746651100
    消费者1接收到消息:【hello, message_5】21:12:51.768401400
    消费者1接收到消息:【hello, message_6】21:12:51.790511400
    消费者1接收到消息:【hello, message_7】21:12:51.812559800
    消费者1接收到消息:【hello, message_8】21:12:51.834500600
    消费者1接收到消息:【hello, message_9】21:12:51.857438800
    消费者1接收到消息:【hello, message_10】21:12:51.880379600
    消费者2........接收到消息:【hello, message_11】21:12:51.899327100
    消费者1接收到消息:【hello, message_12】21:12:51.922828400
    消费者1接收到消息:【hello, message_13】21:12:51.945617400
    消费者1接收到消息:【hello, message_14】21:12:51.968942500
    消费者1接收到消息:【hello, message_15】21:12:51.992215400
    消费者1接收到消息:【hello, message_16】21:12:52.013325600
    消费者1接收到消息:【hello, message_17】21:12:52.035687100
    消费者1接收到消息:【hello, message_18】21:12:52.058188
    消费者1接收到消息:【hello, message_19】21:12:52.081208400
    消费者2........接收到消息:【hello, message_20】21:12:52.103406200
    消费者1接收到消息:【hello, message_21】21:12:52.123827300
    消费者1接收到消息:【hello, message_22】21:12:52.146165100
    消费者1接收到消息:【hello, message_23】21:12:52.168828300
    消费者1接收到消息:【hello, message_24】21:12:52.191769500
    消费者1接收到消息:【hello, message_25】21:12:52.214839100
    消费者1接收到消息:【hello, message_26】21:12:52.238998700
    消费者1接收到消息:【hello, message_27】21:12:52.259772600
    消费者1接收到消息:【hello, message_28】21:12:52.284131800
    消费者2........接收到消息:【hello, message_29】21:12:52.306190600
    消费者1接收到消息:【hello, message_30】21:12:52.325315800
    消费者1接收到消息:【hello, message_31】21:12:52.347012500
    消费者1接收到消息:【hello, message_32】21:12:52.368508600
    消费者1接收到消息:【hello, message_33】21:12:52.391785100
    消费者1接收到消息:【hello, message_34】21:12:52.416383800
    消费者1接收到消息:【hello, message_35】21:12:52.439019
    消费者1接收到消息:【hello, message_36】21:12:52.461733900
    消费者1接收到消息:【hello, message_37】21:12:52.485990
    消费者1接收到消息:【hello, message_38】21:12:52.509219900
    消费者2........接收到消息:【hello, message_39】21:12:52.523683400
    消费者1接收到消息:【hello, message_40】21:12:52.547412100
    消费者1接收到消息:【hello, message_41】21:12:52.571191800
    消费者1接收到消息:【hello, message_42】21:12:52.593024600
    消费者1接收到消息:【hello, message_43】21:12:52.616731800
    消费者1接收到消息:【hello, message_44】21:12:52.640317
    消费者1接收到消息:【hello, message_45】21:12:52.663111100
    消费者1接收到消息:【hello, message_46】21:12:52.686727
    消费者1接收到消息:【hello, message_47】21:12:52.709266500
    消费者2........接收到消息:【hello, message_48】21:12:52.725884900
    消费者1接收到消息:【hello, message_49】21:12:52.746299900
    可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
    正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
    3.1.3 小结
    Work模型的使用:
    ● 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    ● 通过设置prefetch来控制消费者预取的消息数量
相关文章
|
2月前
|
消息中间件 Java 数据安全/隐私保护
RabbitMQ集群部署
本文介绍RabbitMQ集群部署及高可用方案,涵盖普通集群搭建、镜像模式配置与仲裁队列使用。通过Docker部署三节点集群,配置Erlang Cookie与rabbitmq.conf实现节点通信;利用镜像模式实现数据冗余,支持主从切换;引入3.8版本后的仲裁队列,简化高可用配置,提升系统容错能力。
|
2月前
|
消息中间件 数据库 UED
1.1 同步调用与异步调用
本文介绍了微服务间的同步与异步调用。同步调用需等待结果返回,顺序执行,适合实时性高、操作简单的场景;异步调用发出请求后可继续执行其他任务,提升效率与资源利用率,适用于耗时操作。通过支付、点餐、挂号等生活实例对比,阐述了二者特点、适用场景及优缺点。
|
3月前
|
设计模式 缓存 安全
无锁编程与原子操作:构建极致性能的高并发队列
本文深入探讨无锁编程与原子操作在高并发队列中的应用,通过CAS、环形缓冲、版本化引用等技术,实现高性能、低延迟的线程安全队列,显著提升系统吞吐量,适用于日志、网络通信等高并发场景。
237 10
|
5月前
|
SQL 关系型数据库 MySQL
MySQL锁机制:并发控制与事务隔离
本文深入解析了MySQL的锁机制与事务隔离级别,涵盖锁类型、兼容性、死锁处理及性能优化策略,助你掌握高并发场景下的数据库并发控制核心技巧。
|
6月前
|
缓存 Java
线程池的核心参数
线程池七大参数解析:核心线程数决定常驻线程,最大线程数控制并发上限,存活时间管理非核心线程生命周期,工作队列缓存待处理任务,线程工厂定制线程属性,拒绝策略应对任务过载,提升系统稳定性与资源利用率。
359 1
|
3月前
|
JSON API 数据安全/隐私保护
拼多多搜索关键词获取商品信息的API接口
本文介绍如何通过拼多多开放平台API实现商品关键词搜索,涵盖注册账号、获取密钥、查阅文档、构造请求、解析数据等步骤,基于Python示例演示完整流程,并提醒权限限制、错误处理与安全事项,助力开发者高效获取商品信息。(238字)
520 0
|
4月前
|
机器学习/深度学习 人工智能 搜索推荐
拔俗AI体征营养指导系统:从数据到建议的技术闭环
AI如何读懂身体并给出科学营养建议?本文从开发者视角揭秘三大核心技术:多源异构数据融合,构建个性化推荐引擎,以及反馈驱动的持续学习系统。通过打通“感知-决策-反馈”闭环,AI真正实现千人千面的动态营养指导,成为可进化的健康伙伴。(238字)
213 0
|
5月前
|
存储 消息中间件 缓存
Redis 简介:打造快速数据存储的利器
Redis 是一款开源的内存数据结构服务器,支持字符串、哈希、列表等多种数据结构,具备高性能、持久化、高可用及分布式特性,适用于缓存、会话管理、实时统计等场景。
|
6月前
|
存储 搜索推荐 算法
归并排序算法
归并排序是一种基于分治思想的高效排序算法,通过将序列不断划分为不可再分的子序列,再两两合并完成排序。其时间复杂度为O(nlogn),适用于对序列进行升序或降序排列。
420 0
|
机器学习/深度学习 自然语言处理 算法
【数据挖掘】金山办公2020校招大数据和机器学习算法笔试题
金山办公2020校招大数据和机器学习算法笔试题的解析,涵盖了编程、数据结构、正则表达式、机器学习等多个领域的题目和答案。
316 10