SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)

简介: SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)

1. 前言


上一篇文章,实现了用 SpringBoot实现RabbitMQ的简单队列, 篇文章 操作 用SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)


Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用 work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

注意:不一定是两个消费者。


2. 思路


代码实现思路:

1.在 publisher 服务中定义测试方法,产生50条消息(每隔20ms发送一条),发送到 test2024.simple.queue

2.在 consumer 服务中定义两个消息监听者,都监听simple.queue队列

3.消费者1处理50条消息(每隔20ms处理一条),消费者2处理50条消息(每隔100ms处理一条)


3. 消息发送


发送消息前先把 服务接收 的服务停止调.


循环 50 次, 每次休息 20 ms , 发送消息到指定队列

@Test
public void test01() throws InterruptedException {
  //  声明队列名称
   String queueName = "test2024.simple.queue";
   String message = "work_";
   for (int i = 1; i <= 50; i++) {
       // 发送消息
       rabbitTemplate.convertAndSend(queueName, message + i);
       Thread.sleep(20);
   }
}

发送完消息后, 我们看到消息整整齐齐的在 队列里躺着.


4. 消息接收


来连个监听器监听指定的队列, 但是两个监听队列的消费时间不一样.

@Component
public class SpringRabbitListener {

    //  监听指定队列,Spring只要接收到该队列的消息就会接收消息
    @RabbitListener(queues = "test2024.simple.queue")
    public void rabbitListener1(String message){
        System.out.println("1号接收器-接收到消息:" + message);;
    }

    //  监听指定队列,Spring只要接收到该队列的消息就会接收消息
    @RabbitListener(queues = "test2024.simple.queue")
    public void rabbitListener2(String message){
        System.out.println("2号接收器-接收到消息:" + message);;
    }

}

此时启动消费者服务,然后再启动发送消息.

可以看到 1 号接收器很快消费了 25 条, 然后 2 号接收器缓慢的完成自己的… 1 号接收器没有给 2 号接收器帮忙. 那么咋生产环境中 ,就会造成服务闲置的情况下不能及时消费消息.


说明:阐述上述原因是因为队列平均分配给每个消费者,即使当前消费者没有消费完,队列也会将消息分配给消费者。然后消费者一个一个消息消费,即使消费很快的消费者,消费完毕,而消费很慢的消费者一直在消费。这样很不合理。应该是哪个消费者消费快应该多消费。哪个消费者消费慢应该少消费。


4.1 能者多劳

在 Spring 中有一个简单的配置叫预取 prefetch,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,添加配置:

 listener:
      simple:
        prefetch: 1   # 消费者每次最多只能预取一条消息,当消费完这条消息后,才能获取下一个消息,这样做的好处是消费能力强的消费者,处理的消息就会更多===》能者多劳

做了如上配置后,启动消费者服务,再次发送消息:

我们看到消费能力强的1号接收器完成了更多的工作,这样就达到了能者多劳.


总结


Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
30天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
1月前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
1月前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
141 0
|
1月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
48 1
|
1月前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
49 0
|
1月前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
16 1
|
5天前
|
消息中间件 测试技术 开发工具
消息队列 MQ操作报错合集之收到"WARN RocketmqClient - consumeMessage Orderly return"警告,是什么原因
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
5天前
|
消息中间件 设计模式 网络安全
消息队列 MQ操作报错合集之broker启用controller配置时,遇到报错,是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
5天前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
5天前
|
消息中间件 Apache RocketMQ
消息队列 MQ操作报错合集之设置了controller后,有一主一从,但只显示一个,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。