RabbitMQ消费者性能优化相关配置说明

简介: RabbitMQ消费者性能优化相关配置说明
分布式消息中间件


RabbitMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。


在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。


Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:


  • fanout
  • topic
  • direct


(1)Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;


(2)Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;


(3)direct类型的Exchange是最直接最简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)


消费端yml配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
#        acknowledge-mode: manual  # 手动确定(默认自动确认)
        concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
        max-concurrency: 10 # 消费端的监听最大个数
        prefetch: 10
    connection-timeout: 15000   # 超时时间


在消费端,配置prefectch和concurrency参数便可以实现消费端mq并发处理消息,那么这两个参数具有有什么含义呢?

1 prefetch属性


prefetch是每次从一次性从broker里面取的待消费的消息的个数。


每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。


prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。


不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。


对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1。


如果要保证消息的可靠不丢失,当prefetch大于1时,可能会出现因为服务宕机引起的数据丢失,故建议将prefetch=1。


2 concurrency属性


concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数。在上面的yml配置中,concurrency=1,即每个Listener容器将开启一个线程去处理消息。在2.0以后的版本中,可以在注解中配置该参数:

//如果设置了exclusive=true,即消费者独占queue,且concurrency大于1,则会抛出异常
@RabbitListener(queues = QUEUE_NAME, concurrency = "4", exclusive = false)
public void consumerDelayMessage(Message message, Channel channel) {
    try {
        this.handle(message, channel);
    } catch (Exception e) {
        log.error("消费RabbitMQ消息失败,失败原因:{}", e.getMessage());
    }
}


在服务启动后,可以发现在Listener容器中产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。


3 prefech和concurrency属性


若一个消费者配置prefetch=10,concurrency=2,会有两个消费者(或是线程)同时监听Queue,但是注意这里的消息只要有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值10,意味着每个消费者每次会预取10个消息准备消费(注意不是两个消费者去共享内存中抓取的消息)。


每个消费者对应的listener有个Exclusive参数,默认为false, 如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。


4 业务问题


前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。


设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 数据可视化 大数据
【如何安装和配置RabbitMQ(转载)】
【如何安装和配置RabbitMQ(转载)】
31 2
|
5月前
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
112 0
|
5月前
|
消息中间件 存储 Shell
Docker部署RabbitMQ配置日志映射本地文件
Docker部署RabbitMQ配置日志映射本地文件
125 0
|
5月前
|
传感器 JSON 物联网
什么是MQTT遗嘱消息?如何配置和处理遗嘱消息?
什么是MQTT遗嘱消息?如何配置和处理遗嘱消息?
274 0
什么是MQTT遗嘱消息?如何配置和处理遗嘱消息?
|
5月前
|
存储 传感器 物联网
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
261 0
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
|
3月前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
3月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
31 0
|
11天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
28 0
RabbitMQ入门指南(九):消费者可靠性
|
28天前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
28天前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1

热门文章

最新文章