MQ 使用场景: 应用解耦、消峰填谷,消息分发。
Consumer
RocketMQ Consumer 分为 Pull Consumer 和 Push Consumer ,其实就是推拉消费者。
- Pull Consumer
- Push Consumer
DefaultMQPushConsumer
DefaultMQPushCOnsumerImpl 通过 start 方法启动
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
DefaultMQPushConsumer#start 启动代码:
public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
先简化说一下启动流程:
启动步骤如下, 启动准备,从nameserver 获取 topic 路由信息,检查 Consumer 配置,向每个 broker 发心跳,触发一次 rebalance.
public synchronized void start() throws MQClientException { // 1. 启动准备工作 switch (this.serviceState) {...} // 2. 从 NameServer 更新 topic 路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 3. 检查 consumer 配置 this.mQClientFactory.checkClientInBroker(); // 4. 向每个broker 发送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 5. 立即触发一次 reblance this.mQClientFactory.rebalanceImmediately(); }
启动准备工作中有 CREATE_JUST
状态判断。CREATE_JUST
的意思是 Service just created,not start
创建服务,没有启动。
start 启动核心代码在 DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置 this.checkConfig(); // 2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // 3. 获取MQ 实例,先从缓存中国取,没有则创建 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 4. 设置 重负载的消费组 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); //设置消费模式 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 设置负载策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); // 实例化消息拉取的包装类并注册消息过滤的消息类 this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // 选择对应的 offset 实现类,并加载 offset 集群,广播消息的 offset 从消费者本地获取,集群模式从 Brocker 维护。 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); // 启动消息消费者服务 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); // 8. 启动 MQ ClinentInstance boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //9. 从nameserver拉取 topic 订阅消息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //10. 向 broker 校验客户端 this.mQClientFactory.checkClientInBroker(); // 11. 给所有的 broker发送心跳。 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 12. 负载队列 this.mQClientFactory.rebalanceImmediately(); }
咱们细化下启动过程
1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置 2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC 3. 获取MQ实例,先从缓存中取,没有则创建 4. 设置重负载的消费组、消费模式、负载策略等 5. 实例化消息拉取的包装类并注册消息过滤的钩子 选择对应的OffsetStore实现类,并加载offset。广播消息从本地获取offset (Consumer本地维护),6. 集群消息从远程获取 (broker端维护) 7. 启动消息消费服务 7.1 并发消费:15分钟执行一次,过期消息(15分钟还未被消费的消息)一次最多清理16条过期消息,将过期消息发回给broker 7.2 顺序消费:20s定时锁定当前实例消费的所有队列,上锁成功将ProcessQueue的lock属性设置为true 8. 启动MQClientInstance 8.1 启动请求和响应的通道 8.2 开启定时任务 8.2.1 定时30s拉取最新的broker和topic的路由信息 8.2.2.定时30s向broker发送心跳包 8.2.3 定时5s持久化consumer的offset 8.2.4 定时1分钟,动态调整线程池线程数量 8.3 启动消息拉取服务 8.4 每20s重新负载一次 9. 从nameserver拉取topic的订阅信息 10. 向broker校验客户端 11. 给所有的broker的master发送心跳包 、上传filterClass的源文件给FilterServer 12. 立即负载队列
内容比较多,主要关心如下几点:
拷贝订阅消息
构建主体订阅消息 SubscriptionData 并加入到 RebalanceImpl 的订阅消息中,订阅关系主要来自两个:
- 通过调用 DefaltMQPushConsumerImpl#subscribe 方法
- 订阅主题消息,RocketMQ 消息重试以消费组为单位,而不是主题,消息重试主题为
%RETRY%+消费组
. 消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
初始化消息进度
其实说的就是加载 offset , 如果消息消费是集群模式,那么消费进度(offset)保存在 Broker 上;如果是广播方式,消息进度存储在消费端。
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
创建消费者线程服务
根据消费者是否顺序消费, 创建消费端消费线程服务。consumeMessageService
负责消息消费,内部维护的是个线程池。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start();
向MQClientInstance 注册消费者,并启动
向 MQClientInstance 注册消费者,并启动MQClientInstance。在一个 JVM 中所有的生产者和消费者都持有同一个 MQClientInstance, MQClientInstance, 只会启动一次。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start();
总结
介绍了一下 MQ
消费者启动过程,校验配置,获取订阅消息,设置消费者的消费模式,负载策略。加载消息进度,启动消息消费者服务,并向MQClientInstance
注册消费者Group和并启动MQClientInstance
,从 nameserver
拉取 topic
订阅信息,向 Broker
发送心跳包。