本文已收录在Github,关注我,紧跟本系列专栏文章,咱们下篇再续!
- 🚀 魔都架构师 | 全网30W技术追随者
- 🔧 大厂分布式系统/数据中台实战专家
- 🏆 主导交易系统百万级流量调优 & 车联网平台架构
- 🧠 AIGC应用开发先行者 | 区块链落地实践者
- 🌍 以技术驱动创新,我们的征途是改变世界!
- 👉 实战干货:编程严选网
1 consumeFromWhere
package org.apache.rocketmq.common.consumer;
// 消费者从哪个位置开始消费
public enum ConsumeFromWhere {
// 第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET,
// 第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET,
// 第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP,
}
subscription
订阅
/**
* Subscription relationship
* topic和订阅关系
*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
offsetStore
消息进度存储,存储实际的偏移量,两种实现。
/**
* Offset store interface
* 消费进度存储
*/
public interface OffsetStore {
consumeThreadMin/consumeThreadMax
默认10,消费线程池数量/默认20, 消费线程数量
/**
* Minimum consumer thread number
* 最小消费线程数
*/
private int consumeThreadMin = 20;
/**
* Max consumer thread number
* 最大消费线程数
*/
private int consumeThreadMax = 20;
consumeConcurrentlyMaxSpan/pullThresholdForQueue
默认2000, 单队列并行消费允许的最大跨度
/**
* Concurrently max span offset.
* 并发同时最大跨度偏移。对顺序消费无影响
*/
private int consumeConcurrentlyMaxSpan = 2000;
默认1000,拉消息本地队列缓存消息最大数
/**
* 流控阈值,队列级别,每个消息队列默认最多缓存1000条消息
*/
private int pullThresholdForQueue = 1000;
pullInterval/pullBatchSize
默认0,拉消息间隔,由于是长轮询,所以为0,但若应用为流控,也可设置大于0的值,单位毫秒
/**
* 消息拉取间隔
*/
private long pullInterval = 0;
默认32, 批量拉消息,一次最多拉多少条
/**
* 批处理拉取大小
*/
private int pullBatchSize = 32;
consumeMessageBatchMaxSize
DefaultMQPushConsumer.java
/**
* 批量消费规模
*/
private int consumeMessageBatchMaxSize = 1;
默认1,批量消费,一次消费多少条消息。