开篇先上一张RocketMQ官方的经典架构图,如下图所示:
Rocketmq是开源的消息中间件,主要是四部分由NameServer,Producer,Broker,Consumer构成 1)NameServer:类似于注册中心,负责Topic和路由信息的管理 2)Producer:生产者,消息的发送端 3)Broker:消息存储,以及消息转发 4)Consumer:消费者,消费消息 NameServer:是一个近乎无状态的节点,由上面的架构图可知,每台NameServer相互独立,Broker与每台NameServer保持长连接通信。还是老规矩,怎么找NameServer的入口,就看他的启动脚本就完事了NameServer的启动脚本位于源码rocketmq/distribution/bin/mqnamesrv里由脚本可知org.apache.rocketmq.namesrv.NamesrvStartup是启动类,一起看下源码,以下源码解析均为重要方法选择性解析,全面解析的话时间不是很充足。
public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; } 复制代码
1)启动类可以看出NamesrvController是整个NameServer的核心控制
看下NamesrvController的类图
//代码 // namerServer 配置信息 private final NamesrvConfig namesrvConfig; // netty配置信息 private final NettyServerConfig nettyServerConfig; // 调度任务 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); // 读取或变更NameServer的属性 加载NamerServer的属性到内存 private final KVConfigManager kvConfigManager; // 记录Broker,Topic信息以及关联 private final RouteInfoManager routeInfoManager; // netty server端 private RemotingServer remotingServer; // 处理netty事件接口(连接,关闭,异常,心跳事件) private BrokerHousekeepingService brokerHousekeepingService; // 业务线程池private ExecutorService remotingExecutor;全局配置类 数据版本号,存储base路径等等 private Configuration configuration; // 监听证书文件是否有有变化 private FileWatchService fileWatchService; 复制代码
2)NamesrvController属性
NamesrvConfig:主要是NameServer的一些配置kvConfigPath,rocketmqHome等
如下NettyServerConfig(netty相关配置)源码
public class NettyServerConfig implements Cloneable { //Server端端口 private int listenPort = 8888; //业务线程池个数 private int serverWorkerThreads = 8; //默认业务线程池,采用fixed类型 private int serverCallbackExecutorThreads = 0; //Netty I/O线程数处理Selector读写事件线程数 private int serverSelectorThreads = 3; //用于限流 private int serverOnewaySemaphoreValue = 256; //异步调用限流 private int serverAsyncSemaphoreValue = 64; //channel的空闲时间 idleHandler实现 private int serverChannelMaxIdleTimeSeconds = 120; //socket 发送缓冲区大小 默认 65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; //socket 接收缓冲区大小 65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; //是否使用PooledByteBufd private boolean serverPooledByteBufAllocatorEnable = true; } 复制代码
3)scheduledExecutorService,定时任务线程池的配置详情
public class NettyServerConfig implements Cloneable { //Server端端口 private int listenPort = 8888; //业务线程池个数 private int serverWorkerThreads = 8; //默认业务线程池,采用fixed类型 private int serverCallbackExecutorThreads = 0; //Netty I/O线程数处理Selector读写事件线程数 private int serverSelectorThreads = 3; //用于限流 private int serverOnewaySemaphoreValue = 256; //异步调用限流 private int serverAsyncSemaphoreValue = 64; //channel的空闲时间 idleHandler实现 private int serverChannelMaxIdleTimeSeconds = 120; //socket 发送缓冲区大小 默认 65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; //socket 接收缓冲区大小 65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; //是否使用PooledByteBufd private boolean serverPooledByteBufAllocatorEnable = true; } 复制代码
4)调度器主要执行两个任务,10s一次检测channel是否最后一个的活跃事件,是否大于配置的超时时间,看下任务代码,其实就是类似于心跳检测,在Broker也有调度线程10s一次调用BrokerController.registerBrokerAll()方法实现路由注册等发送QUERY_DATA_VERSION事件去更新LastUpdateTimestamp字段实现心跳检测的功能,源码如下:
//扫描不活跃的的broker 信息 public void scanNotActiveBroker() { //获取所有live的brokerinfo Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); //获取最新的一次updateTime时间 long last = next.getValue().getLastUpdateTimestamp(); //判断是否超时 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //超时的话,关闭broker的channel RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); //移除BrokerLiveInfo,以及对应的Topic信息 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } 复制代码
5)RouteInfoManager(NameServer存储的核心信息)
//以下为具体属性 //NameServer 与broker的空闲时间 查过两分钟没有收到心跳 关闭该链接 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //读写锁 private final ReadWriteLock lock = new ReentrantReadWriteLock(); //topic 与队列的关系 以及topic 分布在哪些Broker上 每个broker存在该topic的个数 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // 所有的broker信息 brokerName 为key private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //broker集群信息,每个集群包含哪些Broker,key为集群名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 当前存活的Broker,该信息不是实时的,NameServer每10S扫描一次所有的broker,根据心跳包的时间得知broker的状态 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //过滤的broker private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; 复制代码
6)核心属性实体(QueueData队列属性)
//写队列:producer可以发送的queue的个数 //读队列:consumer可以拉取的队列的个数 public class QueueData implements Comparable<QueueData> { //borker name private String brokerName; //写队列的个数 private int readQueueNums; //读队列的个数 private int writeQueueNums; //权限 private int perm; //同步复制 还是异步复制 private int topicSynFlag; } 复制代码
7)BrokerData(Broker属性)
public class BrokerData implements Comparable<BrokerData> { //集群名称,节点信息,分布在哪个节点信息 private String cluster; //brokerName private String brokerName; //broker 对应的IP:Port,brokerId=0表示Master,大于0表示Slave。 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; //随机数,随机获取一个broker的地址 private final Random random = new Random(); } 复制代码
8)BrokerLiveInfo(活跃的Broker)
class BrokerLiveInfo { //时间戳用于判断Broker是否存活 private long lastUpdateTimestamp; private DataVersion dataVersion; //socket通信 private Channel channel; //ha 地址 private String haServerAddr; } 复制代码
9)TopicConfig(Topic配置信息)
public class TopicConfig { private static final String SEPARATOR = " "; //默认读队列数量 public static int defaultReadQueueNums = 16; // 默认写队列数量 public static int defaultWriteQueueNums = 16; //topic 主题名称 private String topicName; //readQueue数量 private int readQueueNums = defaultReadQueueNums; //write Queue数量 private int writeQueueNums = defaultWriteQueueNums; //读取权限 private int perm = PermName.PERM_READ | PermName.PERM_WRITE; //主题过滤方式 private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG; //系统标志,比如事务,延时等 private int topicSysFlag = 0; //是否顺序消费 private boolean order = false; } 复制代码
10)TopicRouteData(Topic路由信息)
public class TopicRouteData extends RemotingSerializable { //是否顺序消息 private String orderTopicConf; //topic分布在哪些Broker上,以及读写队列的个数 private List<QueueData> queueDatas; // topic分布的Broker信息 private List<BrokerData> brokerDatas; //需要过滤的address private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; }



