Broker注册到NameServer源码分析

简介: 写作目的RocketMQ一个用Java写的开源项目,而且也是阿里开源的,所以想看一看设计思路以及一些细节,所以就写了这篇博客,记录一下Broker注册到Nameserver的过程以及心跳逻辑。

前提


要会Netty吧,如果不会的话,感觉应该看不懂吧。


跟源码思路


其实很多源码的讲解都是把一个类都标上注释,其实我感觉这样的人很厉害,因为他确实对这个代码很精通。我的风格比较偷懒,我们想看哪一部分就跟哪一部分和哪个分支,其他的没必要看,这样你就能偷懒了,所以这篇文章想跟的是Broker注册到NameServer源码以及Broker与NameServer的心跳。你调试的时候不也是这样吗,哪报错了你就进哪个分支,不关心的分支我们不去分析。


注意:本文只关心Broker注册到NameServer和心跳逻辑,其他都不关心。


启动源码分析


源码版本


RocketMQ4.9.1


NameServer启动流程


20.png


启动入口我们从NamesrvStartup#main0开始


public static NamesrvController main0(String[] args) {
        try {
            //创建NameServer,其实就初始化一些变量,跳过
            NamesrvController controller = createNamesrvController(args);
            //启动controller
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }


接下来就是NamesrvStartup#start方法


public static NamesrvController start(final NamesrvController controller) throws Exception {
        //初始化,就是给一些属性初始化,其中remotingServer得到初始化
        boolean initResult = controller.initialize();
        //省略
        //启动controller
        controller.start();
        return controller;
    }


下面跟NamesrvController#start方法


public void start() throws Exception {
        //启动NettyServer服务器
        this.remotingServer.start();
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }


此时在NettyRemotingServer#start就启动了一个ServerBootstrap服务端


public void start() {
       //省略
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
       //省略
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        //省略
    }


综上:NameServer的启动流程的核心就是在NettyRemotingServer#start就启动了一个ServerBootstrap并监听9876端口


Broker启动流程


01.png


还是直接看BrokerController#start方法吧,反正前面也是debug


public void start() throws Exception {
        //省略
        //向NameServer注册自己的信息
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            向NameServer注册自己的信息
            this.registerBrokerAll(true, false, true);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    //心跳,重复注册自己
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }
    }


Broker给NameServer发心跳


Broker启动一个定时任务,每次都会向NameServer注册自己,不断覆盖到NameServer存的Broker的信息,从而达到心跳的效果,我只能说一个字,秀。


//BrokerController#start
public void start() throws Exception {
        //省略
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    //心跳,重复注册自己
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        //省略
    }


broker和NameServer之间维护连接


NameServer维护和Broker之间的连接


////NamesrvController#initialize
public boolean initialize() {
        //省略
        //定时任务,根据broker注册到nameServer的时间与此时此刻时间的阈值去判断该broker是否还存活
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //省略
        return true;
    }


总结


1 broker和NameServer之间的心跳我以为是发送心跳包去实现的,结果是通过不断的向nameserver注册自己实现的

2 NameServer通过定时任务不断的扫描brokerLiveTable去根据时间阈值(broker注册的时间和此时此刻的时间差距)实现维护连接


目录
相关文章
|
消息中间件 Java Shell
RocketMQ的broker启动失败解决
RocketMQ的broker启动失败解决
1287 0
|
8月前
|
消息中间件 设计模式 Java
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
235 0
|
8月前
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
133 0
|
8月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
123 0
|
消息中间件 Java RocketMQ
RocketMq Nameserver源码分析-01
RocketMq Nameserver源码分析-01
101 0
|
消息中间件 数据可视化 Java
RocketMq之nameserver源码阅读
RocketMq之nameserver源码阅读
91 0
RocketMq之nameserver源码阅读
|
消息中间件 存储 负载均衡
RocketMQ的路由中心:NameServer源码解析
RocketMQ的路由中心:NameServer源码解析
222 0
RocketMQ的路由中心:NameServer源码解析
|
消息中间件 RocketMQ
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
217 0
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
|
消息中间件 缓存 RocketMQ
这 6 个场景下 RocketMQ 会找不到 Broker
这 6 个场景下 RocketMQ 会找不到 Broker
177 0
这 6 个场景下 RocketMQ 会找不到 Broker
|
消息中间件 Java RocketMQ
【消息中间件】默认RocketMQ消息发送者是如何启动的?
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。