RocketMQ Broker启动流程梳理

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: RocketMQ Broker启动流程梳理

640.jpg

Broker 启动的主函数入口:


org.apache.rocketmq.broker.BrokerStartup:

public static void main(String[] args) {
  start(createBrokerController(args));
}


1.创建配置类


初始化配置主要任务是根据 properties 文件以及命令行参数值,创建了以下配置类:

nettyServerConfig:封装了作为消息队列服务器的配置信息nettyClientConfig:封装了作为NameServer客户端配置信息brokerConfig:封装了 Broker 配置信息messageStoreConfig:封装了 RocketMQ 存储系统的配置信息


1.Broker 初始化


2.1 配置文件加载

主题配置加载:

result = result && this.consumerOffsetManager.load();

这一步主要是加载 topics.json 文件,并解析生成 TopicConfigSerializerWrapper 对象,并 set 进 topicConfigTable 中。


消费者位移管理加载:

result = result && this.subscriptionGroupManager.load();

这一步主要是加载 consumerOffset.json 文件,并解析生成 ConsumerOffsetManager 对象,并替换 offsetTable 成员值。


消费者订阅组加载:

result = result && this.consumerFilterManager.load();

这一步主要是加载 subscriptionGroup.json 文件,并解析生成 SubscriptionGroupManager 对象,并放进 subscriptionGroupTable 中。


消费者过滤管理加载:

result = result && this.consumerFilterManager.load();

这一步主要是加载 consumerFilter.json 文件,并解析生成 ConsumerFilterManager 对象


messageStore 消息存储初始化:

if (result) {
  try {
    this.messageStore =
      new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                              this.brokerConfig);
    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
    //load plugin
    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
  } catch (IOException e) {
    result = false;
    e.printStackTrace();
  }
}

这一步主要是创建了 DefaultMessageStore 对象,这是 Broker 消息寸处的核心实现,创建该对象时也会启动很多相关服务线程,用于管理 store 的存储。


messageStore加载:

result = result && this.messageStore.load();


1)延迟消息加载:加载 delayOffset.json 文件,解析生成DelayOffsetSerializerWrapper,并加入offsetTable中

2)commitLog加载:MappfileQueue映射文件队列加载,加载定义的storePath目录文件

3)consumeQueue加载


2.2 初始化线程池


创建nettyRemotingServer:根据前面初始化好的nettyConfig创建远程通讯服务

   •根据brokerConfig初始化各种线程池:


1)初始化发送消息线程池

2)初始化拉取消息线程池

3)初始化broker管理线程池

4)初始化client管理线程池

5)初始化消费者管理线程池


把这些线程池注册到nettyRemotingServer中

2.3 初始化定时任务:


在线程池注册完后,就会开启各种定时任务:

开启定时记录 Broker 的状态(消息拉取时间总和、消息发送总和等)

BrokerController.this.getBrokerStats().record();

消息位移持久化,定时向 consumerOffset.json 文件中写入消费者偏移量

BrokerController.this.consumerOffsetManager.persist();

消息过滤持久化,定时向 consumerFilter.json 文件写入消费者过滤器信息

BrokerController.this.consumerFilterManager.persist();

定时禁用消费慢的消费者以保护 Broker,可以设置 disableConsumeIfConsumerReadSlowly 属性,默认 false

BrokerController.this.protectBroker();

定时打印 Send、Pull、Query、Transaction 信息

BrokerController.this.printWaterMark();

定时打印已存储在提交日志中但尚未调度到消费队列的字节数

rokerController.this.getMessageStore().dispatchBehindBytes())

定时获取 namserver 地址

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

如果是从服务器:

定时从主服务器获取 TopicConfig、ConsumerOffset、DelayOffset、SubscriptionGroupConfig 等信息

BrokerController.this.slaveSynchronize.syncAll();

如果是主服务器:

定时打印从服务器落后的字节数

BrokerController.this.printMasterAndSlaveDiff();


2.4 添加进程退出时关闭broker资源的钩子函数


1.Broker 启动


3.1 messageStore启动:

启动各类线程服务:

 1)启动刷盘任务线程
 2)启动commitLog线程
 3)启动存储存储统计服务线程storeStateService
 4)启动延迟定时消息服务线程
 5)启动消息分发到各中Consumer queue服务线程reputMessageService
 6)启动HA主从同步线程


启动各类定时任务


3.2 启动netty服务:

remotingServer启动:启动远程通讯服务 fastRemotingServer启动:启动远程通讯服务 broker对外API启动:启动client远程通讯服务

3.3 pullRequestHolderService使拉取消息保持长轮询任务启动

3.4 ClientHouseKeepingService线程定时清除不活动链接任务启动

3.5 过滤服务器任务启动

3.6 向NameServer注册broker信息

3.7 开启定时向NameServer注册broker信息任务

640.jpg













相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
194 0
|
3月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
|
4月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
3月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
70 0
|
3月前
|
消息中间件 网络协议 API
RocketMQ - 生产者启动流程
RocketMQ - 生产者启动流程
38 0
|
5月前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
166 1
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
4月前
|
消息中间件 搜索推荐 RocketMQ
消息队列 MQ使用问题之如何将一个主题的多个分区分布到不同的Broker上
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。