深入理解RabbitMQ中的prefetch_count参数

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 在某一次用户标签服务中大量用到异步流程,使用了RabbitMQ进行解耦。其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和prefetch_count参数都做了调整和测试,得到一个相对合理的组合。这里深入分析一下prefetch_count参数在RabbitMQ中的作用。

微信截图_20220513173717.png


前提



在某一次用户标签服务中大量用到异步流程,使用了RabbitMQ进行解耦。其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和prefetch_count参数都做了调整和测试,得到一个相对合理的组合。这里深入分析一下prefetch_count参数在RabbitMQ中的作用。


prefetch_count参数的含义



先从AMQPAdvanced Message Queuing Protocol,即高级消息队列协议,RabbitMQ实现了此协议的0-9-1版本的大部分内容)和RabbitMQ的具体实现去理解prefetch_count参数的含义,可以查阅对应的文档(见文末参考资料)。AMQP 0-9-1定义了basic.qos方法去限制消费者基于某一个Channel或者Connection上未进行ack的最大消息数量上限。basic.qos方法支持两个参数:

  • global:布尔值。
  • prefetch_count:整数。


这两个参数在AMQP 0-9-1定义中的含义和RabbitMQ具体实现时有所不同,见下表:


global参数值 AMQP 0-9-1prefetch_count参数的含义 RabbitMQprefetch_count参数的含义
false prefetch_count值在当前Channel的所有消费者共享 prefetch_count对于基于当前Channel创建的消费者生效
true prefetch_count值在当前Connection的所有消费者共享 prefetch_count值在当前Channel的所有消费者共享


或者用简洁的英文表格理解:


global prefetch_count in AMQP 0-9-1 prefetch_count in RabbitMQ
false Per channel limit Per customer limit
true Per connection limit Per channel limit


这里画一个图理解一下:


微信截图_20220513173726.png


上图仅仅为了区分协议本身和RabbitMQ中实现的不同,接着说说prefetch_count对于消费者(线程)和待消费消息的作用。假定一个前提:RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个Channel实例。当global参数为false时候,效果如下:


微信截图_20220513173734.png


而当global参数为true时候,效果如下:


微信截图_20220513173740.png


在消费者线程处理速度远低于RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度的场景下,prefetch_count条未进行ack的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。这部分消息会占据JVM的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到RabbitMQ的消费者,必须要考虑这些"预取消息"的内存占用量。不过值得注意的是:prefetch_countRabbitMQ服务端的参数,它的设置值或者快照都不会存放在RabbitMQ客户端。同时需要注意prefetch_count生效的条件和特性(从参数设置的一些demo和源码上感知):


  • prefetch_count参数仅仅在basic.consumeautoAck参数设置为false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
  • basic.consume如果在非自动确认模式下忘记了手动调用basic.ack,那么prefetch_count正是未ack消息数量的最大上限。
  • prefetch_count是由RabbitMQ服务端控制,一般情况下能保证各个消费者线程中的未ack消息分发是均衡的,这点笔者猜测是consumerTag起到了关键作用。


RabbitMQ客户端中prefetch_count源码跟踪



编写本文的时候引入的RabbitMQ客户端版本为:com.rabbitmq:amqp-client:5.9.0


上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读RabbitMQJava客户端源码,主要是针对basic.qosbasic.consume两个方法,对应的是com.rabbitmq.client.impl.ChannelN#basicQos()com.rabbitmq.client.impl.ChannelN#basicConsume()两个方法。先看ChannelN#basicQos()


微信截图_20220513173749.png

微信截图_20220513173756.png


这里的basicQos()方法多了一个prefetchSize参数,用于限制分发内容的大小上限,默认值0代表无限制,而prefetchCount的取值范围是[0,65535],取值为0也是代表无限制。这里的ChannelN#basicQos()实现中直接封装basic.qos方法参数进行一次RPC调用,意味着直接更变RabbitMQ服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。接着看ChannelN#basicConsume()方法:


微信截图_20220513173803.png


上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:


微信截图_20220513173819.png


整个消息消费过程,prefetch_count参数并未出现在客户端代码中,又再次印证了前面一节的结论,即prefetch_count参数的行为和作用完全由RabbitMQ服务端控制。而最终Customer或者常用的DefaultCustomer句柄是在WorkPoolRunnable中回调的,这类任务的执行线程来自于ConsumerWorkService内部的线程池,而这个线程池又使用了Executors.newFixedThreadPool()去构建,使用了默认的线程工厂类,因此在Customer#handleDelivery()方法内部打印的线程名称的样子是pool-1-thread-*


这里VariableLinkedBlockingQueue就是前一节中的message queue的原型


prefetch_count参数使用



设置prefetch_count参数比较简单,就是调用Channel#basicQos()方法:


public class RabbitQos {
    static String QUEUE = "qos.test";
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, true, false, false, null);
        channel.basicQos(2);
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("1------" + Thread.currentThread().getName());
                sleep();
            }
        });
        channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("2------" + Thread.currentThread().getName());
                sleep();
            }
        });
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
        }
        sleep();
    }
    private static void sleep() {
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception ignore) {
        }
    }
}
复制代码


上面是原生的amqp-client的写法,如果使用了spring-amqpspring-boot-starter-amqp),可以通过配置文件中的spring.rabbitmq.listener.direct.prefetch属性指定所有消费者线程的prefetch_count,如果要针对部分消费者线程进行该属性的设置,则需要针对RabbitListenerContainerFactory进行改造。


prefetch_count参数最佳实践



关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:


微信截图_20220513174046.png


这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。也就是说,官方是建议把prefetch_count设置为30。这里再参看一下spring-boot-starter-amqp中对此参数定义的默认值,具体是AbstractMessageListenerContainer中的DEFAULT_PREFETCH_COUNT


微信截图_20220513173828.png


如果没有通过spring.rabbitmq.listener.direct.prefetch进行覆盖,那么使用spring-boot-starter-amqp中的注解定义的消费者线程中设置的prefetch_count就是250


笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。总结如下(个人经验仅供参考):


  • 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把prefetch_count设置为1
  • 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未ack总消息量的内存极限,从而设计一个合理的prefetch_count值。
  • 当消费者线程的处理速度十分快,远远大于RabbitMQ服务端的消息分发,在网络带宽充足的前提下,设置可以把prefetch_count值设置为0,不做任何的消息流控。
  • 一般场景下,建议使用RabbitMQ官方的建议值30或者spring-boot-starter-amqp中的默认值250


小结



小结一下:

  • prefetch_countRabbitMQ服务端的参数,设置后即时生效。
  • prefetch_count对于AMQP-0-9-1中的定义与RabbitMQ中的实现不完全相同。
  • prefetch_count值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。


彩蛋



笔者把文章发布到公众号和朋友圈后,笔者的师傅作了点评,指出其中的一点不足:


微信截图_20220513173840.png


确实如此,prefetch_count的本质作用就是消费者的流控,官方的那篇文章也提到了网络和带宽的重要性,所以要考虑RTTRound-Trip Time,往返时延),这里的RTT概念来源于《计算机网络原理》:


The RTT includes packet-propagation delays, packet-queuing delays and packet -processing delay.


也就是说RTT = 数据包传播时延(往返)+ 数据包排队时延(路由器和交换机的)+ 数据处理时延(应用程序处理耗时,用在本文的场景就是消费者处理消息的耗时)。假设RTT中只计算网络的时延,不包含数据处理的时延,那么数据包往返需要2RTT,也就是一条消费消息处理的数据包的往返,RTT越大,那么数据传输成本越高,应该允许客户端"预取"更多的未ack消息避免消费者线程等待。这样就可以计算出单个消费者线程处理达到最饱和状态下的"预取"消息量:prefetch_count = 2RTT / 消费者线程处理单条消息的耗时。依照此概念举例:


  • RTT30ms,而消费者线程处理单条消息的耗时为10ms,此时,消费速率占优势,可以考虑把prefetch_count设置为6或者更大的值(考虑堆内存极限的限制)。
  • RTT30ms,而消费者线程处理单条消息的耗时为200msRTT占优势,消费速率滞后,此时考虑把prefetch_count设置为1即可。


思考:为什么spring-boot-starter-amqp把prefetch_count默认值设置为250这么高的值,很少开发者改动它却没有出现明显问题?


(本文完 c-4-d e-a-20201017)

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 缓存 Apache
消息队列 MQ使用问题之对于Grpc参数的调优,该如何操作
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 监控 Java
在RocketMQ中,Proxy的gRPC参数调优是一项重要的性能优化工作
在RocketMQ中,Proxy的gRPC参数调优是一项重要的性能优化工作【1月更文挑战第10天】【1月更文挑战第46篇】
104 2
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
695 0
|
消息中间件 存储 缓存
RocketMQ参数约束和建议
Apache RocketMQ 系统中存在很多自定义参数和资源命名,您在使用 Apache RocketMQ 时建议参考如下说明规范系统设置,避对某些具体参数设置不合理导致应用出现异常。
228 0
EMQ
|
负载均衡 安全 网络协议
创建 MQTT 连接时如何设置参数?
本文将详细讲解创建MQTT连接时各个连接参数的作用,帮助开发者迈出使用MQTT的第一步。
EMQ
1129 0
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!