RocketMQ源码分析-NameServer解析一

简介: 开篇先上一张RocketMQ官方的经典架构图,如下图所示:

微信截图_20220531114752.png

开篇先上一张RocketMQ官方的经典架构图,如下图所示:微信截图_20220531114839.png

Rocketmq是开源的消息中间件,主要是四部分由NameServer,Producer,Broker,Consumer构成 1)NameServer:类似于注册中心,负责Topic和路由信息的管理 2)Producer:生产者,消息的发送端 3)Broker:消息存储,以及消息转发 4)Consumer:消费者,消费消息 NameServer:是一个近乎无状态的节点,由上面的架构图可知,每台NameServer相互独立,Broker与每台NameServer保持长连接通信。还是老规矩,怎么找NameServer的入口,就看他的启动脚本就完事了NameServer的启动脚本位于源码rocketmq/distribution/bin/mqnamesrv里由脚本可知org.apache.rocketmq.namesrv.NamesrvStartup是启动类,一起看下源码,以下源码解析均为重要方法选择性解析,全面解析的话时间不是很充足。

public static void main(String[] args) {
    main0(args);
}
public static NamesrvController main0(String[] args) {
    try {
        NamesrvController controller = createNamesrvController(args);
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}
复制代码

1)启动类可以看出NamesrvController是整个NameServer的核心控制

看下NamesrvController的类图

微信截图_20220531114933.png

//代码
// namerServer 配置信息
private final NamesrvConfig namesrvConfig;
// netty配置信息
private final NettyServerConfig nettyServerConfig;
// 调度任务
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(    "NSScheduledThread"));
// 读取或变更NameServer的属性 加载NamerServer的属性到内存   
private final KVConfigManager kvConfigManager; 
// 记录Broker,Topic信息以及关联
private final RouteInfoManager routeInfoManager;
// netty server端
private RemotingServer remotingServer;
// 处理netty事件接口(连接,关闭,异常,心跳事件)
private BrokerHousekeepingService brokerHousekeepingService;
// 业务线程池private ExecutorService remotingExecutor;全局配置类 数据版本号,存储base路径等等
private Configuration configuration;      
// 监听证书文件是否有有变化
private FileWatchService fileWatchService;
复制代码

2)NamesrvController属性

NamesrvConfig:主要是NameServer的一些配置kvConfigPath,rocketmqHome等

如下NettyServerConfig(netty相关配置)源码

public class NettyServerConfig implements Cloneable {
    //Server端端口
    private int listenPort = 8888;
    //业务线程池个数
    private int serverWorkerThreads = 8;
    //默认业务线程池,采用fixed类型
    private int serverCallbackExecutorThreads = 0;
    //Netty I/O线程数处理Selector读写事件线程数
    private int serverSelectorThreads = 3;
    //用于限流
    private int serverOnewaySemaphoreValue = 256;
    //异步调用限流
    private int serverAsyncSemaphoreValue = 64;
    //channel的空闲时间 idleHandler实现
    private int serverChannelMaxIdleTimeSeconds = 120;
    //socket 发送缓冲区大小 默认 65535
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    //socket 接收缓冲区大小      65535
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    //是否使用PooledByteBufd
    private boolean serverPooledByteBufAllocatorEnable = true;
}
复制代码

3)scheduledExecutorService,定时任务线程池的配置详情

public class NettyServerConfig implements Cloneable {
   //Server端端口
   private int listenPort = 8888;
   //业务线程池个数
   private int serverWorkerThreads = 8;
   //默认业务线程池,采用fixed类型
   private int serverCallbackExecutorThreads = 0;
   //Netty I/O线程数处理Selector读写事件线程数
   private int serverSelectorThreads = 3;
   //用于限流
   private int serverOnewaySemaphoreValue = 256;
   //异步调用限流
   private int serverAsyncSemaphoreValue = 64;
   //channel的空闲时间 idleHandler实现
   private int serverChannelMaxIdleTimeSeconds = 120;
   //socket 发送缓冲区大小 默认 65535
   private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
   //socket 接收缓冲区大小      65535
   private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
   //是否使用PooledByteBufd
   private boolean serverPooledByteBufAllocatorEnable = true;
}
复制代码

4)调度器主要执行两个任务,10s一次检测channel是否最后一个的活跃事件,是否大于配置的超时时间,看下任务代码,其实就是类似于心跳检测,在Broker也有调度线程10s一次调用BrokerController.registerBrokerAll()方法实现路由注册等发送QUERY_DATA_VERSION事件去更新LastUpdateTimestamp字段实现心跳检测的功能,源码如下:

//扫描不活跃的的broker 信息
public void scanNotActiveBroker() {
   //获取所有live的brokerinfo
   Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
   while (it.hasNext()) {
       Entry<String, BrokerLiveInfo> next = it.next();
       //获取最新的一次updateTime时间
       long last = next.getValue().getLastUpdateTimestamp();
       //判断是否超时
       if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
           //超时的话,关闭broker的channel
           RemotingUtil.closeChannel(next.getValue().getChannel());
           it.remove();
           log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
           //移除BrokerLiveInfo,以及对应的Topic信息
           this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
       }
   }
}
复制代码

5)RouteInfoManager(NameServer存储的核心信息)

微信截图_20220531115212.png

//以下为具体属性
//NameServer 与broker的空闲时间 查过两分钟没有收到心跳 关闭该链接
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//topic 与队列的关系  以及topic 分布在哪些Broker上 每个broker存在该topic的个数
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 所有的broker信息 brokerName 为key
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//broker集群信息,每个集群包含哪些Broker,key为集群名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 当前存活的Broker,该信息不是实时的,NameServer每10S扫描一次所有的broker,根据心跳包的时间得知broker的状态
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//过滤的broker
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
复制代码

6)核心属性实体(QueueData队列属性)

//写队列:producer可以发送的queue的个数
//读队列:consumer可以拉取的队列的个数
public class QueueData implements Comparable<QueueData> { 
//borker name  
private String brokerName;  
//写队列的个数  
private int readQueueNums; 
//读队列的个数  
private int writeQueueNums; 
//权限
private int perm;       
//同步复制 还是异步复制   
private int topicSynFlag;  
}
复制代码

7)BrokerData(Broker属性)

public class BrokerData implements Comparable<BrokerData> {
    //集群名称,节点信息,分布在哪个节点信息
    private String cluster;
    //brokerName
    private String brokerName;
    //broker 对应的IP:Port,brokerId=0表示Master,大于0表示Slave。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    //随机数,随机获取一个broker的地址
    private final Random random = new Random();
}
复制代码

8)BrokerLiveInfo(活跃的Broker)

class BrokerLiveInfo {
    //时间戳用于判断Broker是否存活
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    //socket通信
    private Channel channel;
    //ha 地址
    private String haServerAddr;
}
复制代码

9)TopicConfig(Topic配置信息)

public class TopicConfig {
    private static final String SEPARATOR = " ";
    //默认读队列数量
    public static int defaultReadQueueNums = 16;
    // 默认写队列数量
    public static int defaultWriteQueueNums = 16;
    //topic 主题名称
    private String topicName;
    //readQueue数量
    private int readQueueNums = defaultReadQueueNums;
    //write Queue数量
    private int writeQueueNums = defaultWriteQueueNums;
    //读取权限
    private int perm = PermName.PERM_READ | PermName.PERM_WRITE;
    //主题过滤方式
    private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
    //系统标志,比如事务,延时等
    private int topicSysFlag = 0;
    //是否顺序消费 
    private boolean order = false;
}
复制代码

10)TopicRouteData(Topic路由信息)

public class TopicRouteData extends RemotingSerializable {
   //是否顺序消息
   private String orderTopicConf;
   //topic分布在哪些Broker上,以及读写队列的个数
   private List<QueueData> queueDatas;
   // topic分布的Broker信息
   private List<BrokerData> brokerDatas;
   //需要过滤的address
   private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
284 4
|
11月前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
1056 1
RocketMQ消息重试机制解析!
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
316 3
|
人工智能 前端开发 Java
【Tomcat源码分析】启动过程深度解析 (二)
本文深入探讨了Tomcat启动Web应用的过程,重点解析了其加载ServletContextListener及Servlet的机制。文章从Bootstrap反射调用Catalina的start方法开始,逐步介绍了StandardServer、StandardService、StandardEngine、StandardHost、StandardContext和StandardWrapper的启动流程。每个组件通过Lifecycle接口协调启动,子容器逐层启动,直至整个服务器完全启动。此外,还详细分析了Pipeline及其Valve组件的作用,展示了Tomcat内部组件间的协作机制。
【Tomcat源码分析】启动过程深度解析 (二)
|
消息中间件 存储 SQL
代码很少,却很优秀!RocketMQ的NameServer做到了!
本文深入剖析了RocketMQ的注册中心NameServer,基于RocketMQ release-5.2.0版本。NameServer作为Broker、Producer与Consumer之间的纽带,仅由少数几个类构成,却实现了高性能与轻量化。文章详细介绍了NameServer的AP设计思想、简洁的数据结构及心跳机制。AP设计避免了复杂的分布式协议,简化了网络开销;数据结构主要包括路由表、Broker信息等;心跳机制则通过定时扫描确保Broker的活跃状态。通过这些核心设计,NameServer实现了高效稳定的注册与发现功能。
582 5
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
372 2
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
357 0
|
消息中间件 安全 fastjson
消息队列 MQ使用问题之NameServer集群是什么结构
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之如何设置nameserver监听的IP
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章

推荐镜像

更多
  • DNS
  • 下一篇
    oss云网关配置