深度解析RocketMQ Topic的创建机制

简介: 我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic的创建机制。

我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic的创建机制。


topic在rocketmq的设计思想里,是作为同一个业务逻辑消息的组织形式,它仅仅是一个逻辑上的概念,而在一个topic下又包含若干个逻辑队列,即消息队列,消息内容实际是存放在队列中,而队列又存储在broker中,下面我用一张图来说明topic的存储模型:

640.png其实rocketmq中存在两种不同的topic创建方式,一种是我刚刚说的预先创建,另一种是自动创建,下面我开车带大家从源码的角度来详细地解读这两种创建机制。


自动创建



默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic:


org.apache.rocketmq.common.MixAll:

// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";


自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制,

org.apache.rocketmq.common.BrokerConfig:

@ImportantField
private boolean autoCreateTopicEnable = true;


在broker启动时,会调用TopicConfigManager的构造方法,autoCreateTopicEnable打开后,会将“TBW102”保存到topicConfigTable中:


org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:

// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
    String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    TopicConfig topicConfig = new TopicConfig(topic);
    this.systemTopicList.add(topic);
    topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                 .getDefaultTopicQueueNums());
    topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                  .getDefaultTopicQueueNums());
    int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
    topicConfig.setPerm(perm);
    this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}


broker会通过发送心跳包将topicConfigTable的topic信息发送给nameserver,nameserver将topic信息注册到RouteInfoManager中。


继续看消息发送时是如何从nameserver获取topic的路由信息:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    // 生产者第一次发送消息,topic在nameserver中并不存在
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }
  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    // 第二次请求会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}


如上方法,topic首次发送消息,此时并不能从namserver获取topic的路由信息,那么接下来会进行第二次请求namserver,这时会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息,此时的“TBW102”topic已经被broker默认注册到nameserver了:


org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

if (isDefault && defaultMQProducer != null) {
  // 使用默认的“TBW102”topic获取路由信息
  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
  if (topicRouteData != null) {
    for (QueueData data : topicRouteData.getQueueDatas()) {
      int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
      data.setReadQueueNums(queueNums);
      data.setWriteQueueNums(queueNums);
    }
  }
}


如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。


org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
  changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
  log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}


从本地缓存中取出topic的路由信息,由于topic是第一次发送消息,这时本地并没有该topic的路由信息,所以对比该topic路由信息对比“TBW102”时changed为true,即有变化,进入以下逻辑:


org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:

// Update sub info
{
  Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<String, MQConsumerInner> entry = it.next();
    MQConsumerInner impl = entry.getValue();
    if (impl != null) {
      impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    }
  }
}


将“TBW102”topic路由信息构建TopicPublishInfo,并将用topic为key,TopicPublishInfo为value更新本地缓存,到这里就明白了,原来broker们千辛万苦创建“TBW102”topic并将其路由信息注册到nameserver,被新来的topic获取后立即用“TBW102”topic的路由信息构建出一个TopicPublishInfo并且据为己有,由于TopicPublishInfo的路由信息时默认“TBW102”topic,因此真正要发送消息的topic也会被负载发送到“TBW102”topic所在的broker中,这里我们可以将其称之为偷梁换柱的做法。


当broker接收到消息后,会在msgCheck方法中调用createTopicInSendMessageMethod方法,将topic的信息塞进topicConfigTable缓存中,并且broker会定时发送心跳将topicConfigTable发送给nameserver进行注册。


自动创建与消息发送时获取topic信息的时序图:

640.jpg


预先创建



其实这个叫预先创建似乎更加适合,即预先在broker中创建好topic的相关信息并注册到nameserver中,然后client端发送消息时直接从nameserver中获取topic的路由信息,但是手动创建从动作上来将更加形象通俗易懂,直接告诉你,你的topic信息需要在控制台上自己手动创建。


预先创建需要通过mqadmin提供的topic相关命令进行创建,执行:

./mqadmin updateTopic


官方给出的各项参数如下:

usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg>       create topic to which broker
-c,--clusterName <arg>      create topic to which cluster
-h,--help                   Print help
-n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg>            set topic's order(true|false
-p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg>    set read queue nums
-s,--hasUnitSub <arg>       has unit sub (true|false
-t,--topic <arg>            topic name
-u,--unit <arg>             is unit topic (true|false
-w,--writeQueueNums <arg>   set write queue nums


我们直接定位到其实现类执行命令的方法:


通过broker模式创建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -b,--brokerAddr <arg>   create topic to which broker
if (commandLine.hasOption('b')) {
  String addr = commandLine.getOptionValue('b').trim();
  defaultMQAdminExt.start();
  defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  return;
}


从commandLine命令行工具获取运行时-b参数重的broker的地址,defaultMQAdminExt是默认的rocketmq控制台执行的API,此时调用start方法,该方法创建了一个mqClientInstance,它封装了netty通信的细节,接着就是最重要的一步,调用createAndUpdateTopicConfig将topic配置信息发送到指定的broker上,完成topic的创建。


通过集群模式创建:

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:

// -c,--clusterName <arg>   create topic to which cluster
else if (commandLine.hasOption('c')) {
  String clusterName = commandLine.getOptionValue('c').trim();
  defaultMQAdminExt.start();
  Set<String> masterSet =
    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
  for (String addr : masterSet) {
    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
    System.out.printf("create topic to %s success.%n", addr);
  }
  return;
}


通过集群模式创建与通过broker模式创建的逻辑大致相同,多了根据集群从nameserver获取集群下所有broker的master地址这个步骤,然后在循环发送topic信息到集群中的每个broker中,这个逻辑跟指定单个broker是一致的。


这也说明了当用集群模式去创建topic时,集群里面每个broker的queue的数量相同,当用单个broker模式去创建topic时,每个broker的queue数量可以不一致。


预先创建时序图:

640.png


何时需要预先创建Topic?



建议线下开启,线上关闭,不是我说的,是官方给出的建议:

640.png


rocketmq为什么要这么设计呢?经过一波源码深度解析后,我得到了我想要的答案:


根据上面的源码分析,我们得出,rocketmq在发送消息时,会先去获取topic的路由信息,如果topic是第一次发送消息,由于nameserver没有topic的路由信息,所以会再次以“TBW102”这个默认topic获取路由信息,假设broker都开启了自动创建开关,那么此时会获取所有broker的路由信息,消息的发送会根据负载算法选择其中一台Broker发送消息,消息到达broker后,发现本地没有该topic,会在创建该topic的信息塞进本地缓存中,同时会将topic路由信息注册到nameserver中,那么这样就会造成一个后果:以后所有该topic的消息,都将发送到这台broker上,如果该topic消息量非常大,会造成某个broker上负载过大,这样的消息存储就达不到负载均衡的效果了。



相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
281 2
|
12月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
415 3
|
9月前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
10687 46
|
11月前
|
存储 缓存 监控
后端开发中的缓存机制:深度解析与最佳实践####
本文深入探讨了后端开发中不可或缺的一环——缓存机制,旨在为读者提供一份详尽的指南,涵盖缓存的基本原理、常见类型(如内存缓存、磁盘缓存、分布式缓存等)、主流技术选型(Redis、Memcached、Ehcache等),以及在实际项目中如何根据业务需求设计并实施高效的缓存策略。不同于常规摘要的概述性质,本摘要直接点明文章将围绕“深度解析”与“最佳实践”两大核心展开,既适合初学者构建基础认知框架,也为有经验的开发者提供优化建议与实战技巧。 ####
|
11月前
|
缓存 NoSQL Java
千万级电商线上无阻塞双buffer缓冲优化ID生成机制深度解析
【11月更文挑战第30天】在千万级电商系统中,ID生成机制是核心基础设施之一。一个高效、可靠的ID生成系统对于保障系统的稳定性和性能至关重要。本文将深入探讨一种在千万级电商线上广泛应用的ID生成机制——无阻塞双buffer缓冲优化方案。本文从概述、功能点、背景、业务点、底层原理等多个维度进行解析,并通过Java语言实现多个示例,指出各自实践的优缺点。希望给需要的同学提供一些参考。
188 8
|
10月前
|
PHP 开发者 UED
PHP中的异常处理机制解析####
本文深入探讨了PHP中的异常处理机制,通过实例解析try-catch语句的用法,并对比传统错误处理方式,揭示其在提升代码健壮性与可维护性方面的优势。文章还简要介绍了自定义异常类的创建及其应用场景,为开发者提供实用的技术参考。 ####
|
10月前
|
Java 数据库连接 开发者
Java中的异常处理机制:深入解析与最佳实践####
本文旨在为Java开发者提供一份关于异常处理机制的全面指南,从基础概念到高级技巧,涵盖try-catch结构、自定义异常、异常链分析以及最佳实践策略。不同于传统的摘要概述,本文将以一个实际项目案例为线索,逐步揭示如何高效地管理运行时错误,提升代码的健壮性和可维护性。通过对比常见误区与优化方案,读者将获得编写更加健壮Java应用程序的实用知识。 --- ####
|
11月前
|
Java 开发者 Spring
深入解析:Spring AOP的底层实现机制
在现代软件开发中,Spring框架的AOP(面向切面编程)功能因其能够有效分离横切关注点(如日志记录、事务管理等)而备受青睐。本文将深入探讨Spring AOP的底层原理,揭示其如何通过动态代理技术实现方法的增强。
401 8
|
11月前
|
Java 测试技术 API
Java 反射机制:深入解析与应用实践
《Java反射机制:深入解析与应用实践》全面解析Java反射API,探讨其内部运作原理、应用场景及最佳实践,帮助开发者掌握利用反射增强程序灵活性与可扩展性的技巧。
407 5
|
11月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
252 3

推荐镜像

更多
  • DNS
  • 下一篇
    oss教程