一文搞懂 Apache RocketMQ 消费者关键配置,优化消息消费!

简介: 本文深入解析 RocketMQ 消费者相关配置,涵盖消费位点(consumeFromWhere)、订阅关系(subscription)、消费进度存储(offsetStore)、消费线程池数量(consumeThreadMin/Max)、并发消费跨度(consumeConcurrentlyMaxSpan)、拉取消息参数(pullInterval/pullBatchSize)及批量消费规模(consumeMessageBatchMaxSize)等内容。作者为魔都架构师,拥有丰富的大厂分布式系统实战经验,关注其专栏获取更多技术干货!

本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!

  • 🚀 魔都架构师 | 全网30W技术追随者
  • 🔧 大厂分布式系统/数据中台实战专家
  • 🏆 主导交易系统百万级流量调优 & 车联网平台架构
  • 🧠 AIGC应用开发先行者 | 区块链落地实践者
  • 🌍 以技术驱动创新,我们的征途是改变世界!
  • 👉 实战干货:编程严选网

1 consumeFromWhere

package org.apache.rocketmq.common.consumer;

// 消费者从哪个位置开始消费
public enum ConsumeFromWhere {
   // 第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
   CONSUME_FROM_LAST_OFFSET,

   // 第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
   CONSUME_FROM_FIRST_OFFSET,
   // 第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
   CONSUME_FROM_TIMESTAMP,
}

subscription

订阅

/**
* Subscription relationship
* topic和订阅关系
*/

private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

offsetStore

消息进度存储,存储实际的偏移量,两种实现。

/**
* Offset store interface
* 消费进度存储
*/

public interface OffsetStore {

consumeThreadMin/consumeThreadMax

默认10,消费线程池数量/默认20, 消费线程数量

/**
* Minimum consumer thread number
* 最小消费线程数
*/

private int consumeThreadMin = 20;

/**
* Max consumer thread number
* 最大消费线程数
*/

private int consumeThreadMax = 20;

consumeConcurrentlyMaxSpan/pullThresholdForQueue

默认2000, 单队列并行消费允许的最大跨度

/**
* Concurrently max span offset.
* 并发同时最大跨度偏移。对顺序消费无影响
*/

private int consumeConcurrentlyMaxSpan = 2000;

默认1000,拉消息本地队列缓存消息最大数

/**
* 流控阈值,队列级别,每个消息队列默认最多缓存1000条消息
*/

private int pullThresholdForQueue = 1000;

pullInterval/pullBatchSize

默认0,拉消息间隔,由于是长轮询,所以为0,但若应用为流控,也可设置大于0的值,单位毫秒

/**
* 消息拉取间隔
*/

private long pullInterval = 0;

默认32, 批量拉消息,一次最多拉多少条

/**
* 批处理拉取大小
*/

private int pullBatchSize = 32;

consumeMessageBatchMaxSize

DefaultMQPushConsumer.java

/**
* 批量消费规模
*/

private int consumeMessageBatchMaxSize = 1;

默认1,批量消费,一次消费多少条消息。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
11月前
|
Linux 网络安全 Apache
CentOS 7.2配置Apache服务httpd(上)
CentOS 7.2配置Apache服务httpd(上)
681 1
|
11月前
|
存储 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
137 1
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
|
11月前
|
缓存 前端开发 应用服务中间件
CORS跨域+Nginx配置、Apache配置
CORS跨域+Nginx配置、Apache配置
542 7
|
11月前
apache+tomcat配置多站点集群的方法
apache+tomcat配置多站点集群的方法
121 4
|
11月前
|
消息中间件 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
117 2
|
11月前
|
负载均衡 应用服务中间件 Apache
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
175 3
|
11月前
|
存储 消息中间件 druid
大数据-151 Apache Druid 集群模式 配置启动【上篇】 超详细!
大数据-151 Apache Druid 集群模式 配置启动【上篇】 超详细!
213 1
|
11月前
|
Linux PHP Apache
CentOS 7.2配置Apache服务httpd(下)
CentOS 7.2配置Apache服务httpd(下)
139 1
|
11月前
|
存储 Apache 开发工具
apache的主要目录结构及常见的配置选项的详细说明(图例展示)
apache的主要目录结构及常见的配置选项的详细说明(图例展示)
269 1

推荐镜像

更多