45. 源代码阅读-RocketMQ-tools-阿里云开发者社区

开发者社区> 技术小胖子> 正文

45. 源代码阅读-RocketMQ-tools

简介:
+关注继续查看

一. 简要介绍

RocketMQ-tools分为3部分

  1. admin
  2. command
  3. monitor

下面一一介绍

二. admin

提供了管理操作接口

三. command

command提供了命令行控制MQ的一些方法。

启动方法

进入RocketMQ安装目录,执行sh bin/mqadmin

有几个参数的执行方法

1. sh bin/mqadmin

打印命令提示,如下:

The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   queryMsgByUniqueKey  Query Message by Unique key
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
     ...

2. sh bin/mqadmin help xxx

打印某个命令的提示参数,比如sh bin/mqadmin helop updatetopic(不需要区分大小写)

[root@bogon apache-rocketmq]# sh bin/mqadmin help updatetopic
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
       -t <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help
 -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false
 -w,--writeQueueNums <arg>   set write queue nums

3. sh bin/mqadmin xx 具体执行某个命令

比如 sh bin/mqadmin updatetopic -x xxx

源代码分析

首先来看bin/mqadmin这个启动脚本

1. bin/mqadmin

if [ -z "$ROCKETMQ_HOME" ] ; then
  ## resolve links - $0 may be a link to maven's home
  PRG="$0"

  # need this for relative symlinks
  while [ -h "$PRG" ] ; do
    ls=`ls -ld "$PRG"`
    link=`expr "$ls" : '.*-> \(.*\)$'`
    if expr "$link" : '/.*' > /dev/null; then
      PRG="$link"
    else
      PRG="`dirname "$PRG"`/$link"
    fi
  done

  saveddir=`pwd`

  ROCKETMQ_HOME=`dirname "$PRG"`/..

  # make it fully qualified
  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`

  cd "$saveddir"
fi

export ROCKETMQ_HOME

sh ${ROCKETMQ_HOME}/bin/tools.sh org.apache.rocketmq.tools.command.MQAdminStartup $@

前面在设置ROCKETMQ_HOME环境变量,然后启动MQAdminStartup这个类。

2. MQAdminStartup

public class MQAdminStartup {
    protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();

    public static void main(String[] args) {
        main0(args, null);
    }

    public static void main0(String[] args, RPCHook rpcHook) {
        ...

        initCommand();

        try {
            initLogback();
            switch (args.length) {
                case 0:
                    printHelp();
                    break;
                case 2:
                    if (args[0].equals("help")) {
                        SubCommand cmd = findSubCommand(args[1]);
                        if (cmd != null) {
                            Options options = ServerUtil.buildCommandlineOptions(new Options());
                            options = cmd.buildCommandlineOptions(options);
                            if (options != null) {
                                ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
                            }
                        } else {
                            System.out.printf("The sub command \'" + args[1] + "\' not exist.%n");
                        }
                        break;
                    }
                case 1:
                default:
                    SubCommand cmd = findSubCommand(args[0]);
                    if (cmd != null) {
                        ....

                        cmd.execute(commandLine, options, rpcHook);
                    } else {
                        System.out.printf("The sub command \'" + args[0] + "\' not exist.%n");
                    }
                    break;
            }
        } 
                ...
    }

    public static void initCommand() {
        initCommand(new UpdateTopicSubCommand());
        initCommand(new DeleteTopicSubCommand());
        initCommand(new UpdateSubGroupSubCommand());
        initCommand(new DeleteSubscriptionGroupCommand());
        initCommand(new UpdateBrokerConfigSubCommand());
        initCommand(new UpdateTopicPermSubCommand());

        initCommand(new TopicRouteSubCommand());
        initCommand(new TopicStatusSubCommand());
        initCommand(new TopicClusterSubCommand());

        initCommand(new BrokerStatusSubCommand());
        initCommand(new QueryMsgByIdSubCommand());
        initCommand(new QueryMsgByKeySubCommand());
        initCommand(new QueryMsgByUniqueKeySubCommand());
        initCommand(new QueryMsgByOffsetSubCommand());
        initCommand(new QueryMsgByUniqueKeySubCommand());
        initCommand(new PrintMessageSubCommand());
        initCommand(new PrintMessageByQueueCommand());
        initCommand(new SendMsgStatusCommand());
        initCommand(new BrokerConsumeStatsSubCommad());

        initCommand(new ProducerConnectionSubCommand());
        initCommand(new ConsumerConnectionSubCommand());
        initCommand(new ConsumerProgressSubCommand());
        initCommand(new ConsumerStatusSubCommand());
        initCommand(new CloneGroupOffsetCommand());

        initCommand(new ClusterListSubCommand());
        initCommand(new TopicListSubCommand());

        initCommand(new UpdateKvConfigCommand());
        initCommand(new DeleteKvConfigCommand());

        initCommand(new WipeWritePermSubCommand());
        initCommand(new ResetOffsetByTimeCommand());

        initCommand(new UpdateOrderConfCommand());
        initCommand(new CleanExpiredCQSubCommand());
        initCommand(new CleanUnusedTopicCommand());

        initCommand(new StartMonitoringSubCommand());
        initCommand(new StatsAllSubCommand());

        initCommand(new AllocateMQSubCommand());

        initCommand(new CheckMsgSendRTCommand());
        initCommand(new CLusterSendMsgRTCommand());

        initCommand(new GetNamesrvConfigCommand());
        initCommand(new UpdateNamesrvConfigCommand());
        initCommand(new GetBrokerConfigCommand());

        initCommand(new QueryConsumeQueueCommand());
    }
        ...

逻辑很简单,

  • 首先初始化可以支持的Command - initCommand
  • initLogback根据logback_tools.xml初始化,主要是日志系统。
  • 然后根据传入过来的参数,选择不同的执行分支,也就是上面举例的参数。

3. 某个指令执行过程

以updatetopic为例

....
cmd.execute(commandLine, options, rpcHook);

进入UpdateTopicSubCommand.execute(xx)方法

@Override
    public void execute(final CommandLine commandLine, final Options options,
        RPCHook rpcHook) throws SubCommandException {
        ...
        try {
            TopicConfig topicConfig = new TopicConfig();
            topicConfig.setReadQueueNums(8);
            topicConfig.setWriteQueueNums(8);
            topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

            // readQueueNums
            if (commandLine.hasOption('r')) {
                topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
            }

            // writeQueueNums
            if (commandLine.hasOption('w')) {
                topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
            }

            ...

            if (commandLine.hasOption('b')) {
                String addr = commandLine.getOptionValue('b').trim();

                defaultMQAdminExt.start();
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

                ...
                return;

            } else if (commandLine.hasOption('c')) {
                String clusterName = commandLine.getOptionValue('c').trim();

                defaultMQAdminExt.start();

                Set<String> masterSet =
                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
                for (String addr : masterSet) {
                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                    System.out.printf("create topic to %s success.%n", addr);
                }

                ...
                }

                System.out.printf("%s", topicConfig);
                return;
            }

            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
        } finally {
            defaultMQAdminExt.shutdown();
        }
    }

execute的逻辑就是获取传入的参数,-t是必须的,然后还必须带上-b或者-c才会执行。
如果没有执行,那么会打印updatetopic的参数列表。
-b代表某个broker, -c代表集群

所以大概格式是: sh ./bin/mqadmin updatetopic -t topictest -b xxx

其他参数是可选的。

4. 调用执行

如果确定要更新topic,那么就会调用RocketMQ-cli里面的接口进行更新。
更新成功会打印更新成功输出,否则会报异常。

附上所有命令的操作参数
https://www.cnblogs.com/zyguo/p/4962425.html

四. monitor

监控相关

启动类是参数是startMonitoring

sh bin/mqadmin startMonitoring

代码如下:

public class StartMonitoringSubCommand implements SubCommand {
    private final Logger log = ClientLogger.getLog();

    @Override
    public String commandName() {
        return "startMonitoring";
    }

    @Override
    public String commandDesc() {
        return "Start Monitoring";
    }

    @Override
    public Options buildCommandlineOptions(Options options) {
        return options;
    }

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        try {
            MonitorService monitorService =
                new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);

            monitorService.start();
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
        }
    }
}

五. 命令执行和相关代码分析

获取所有的topic -- topiclist

1. 命令如下

sh bin/mqadmin topiclist -n localhost:9876

2. 结果如下:

huangrongweideMacBook-Pro:apache-rocketmq huangrongwei$ sh bin/mqadmin topiclist -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
%RETRY%lalaase_rename_unique_group_name_4
%RETRY%please_rename_unique_group_name_4
huangrongweideMacBook-Pro.local
%RETRY%bybybse_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
%RETRY%phihomebse_rename_unique_group_name_4
wangyuan.freecomm-networks.com
TBW102
SELF_TEST_TOPIC
mmmzzz
DefaultCluster

3. 参数

  • -n表示要显示连接到的namesrv
  • -c

4. 实现这个功能的类

rocketmq/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java

5. 代码分析

a. 入口类是MQAdminStartup.main方法,走的是case 1分支,如下:
public static void main0(String[] args, RPCHook rpcHook) {
        ...

        initCommand();

        try {
                                ...
                case 1:
                default:
                    SubCommand cmd = findSubCommand(args[0]);
                    if (cmd != null) {
                        ...

                        if (commandLine.hasOption('n')) {
                            String namesrvAddr = commandLine.getOptionValue('n');
                            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
                        }

                        cmd.execute(commandLine, options, rpcHook);
                    } else {
                        System.out.printf("The sub command \'" + args[0] + "\' not exist.%n");
                    }
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
}

如果commandLine包含n参数,也就是如果输入命令有带-n的参数,比如sh bin/mqadmin topiclist -n localhost:9876。那么把它带的参数值拿出来,然后设置到SystemProperty里面。

System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);

这样做是为了其他地方可以拿这个参数,也就是RocketMQ传递namesrv address的时候,不是用普通的参数传递,而是用系统属性。

这样做有好处也有不好的地方吧,好处就是省心,一个地址设置了,很多地方都有可以得到这个值。不需要每个参数传来传去。不好的地方就是如果设置的地方多了,就容易混淆,互相影响。

然后执行command的execute方法, 也就是TopicListSubCommand.execute方法。

cmd.execute(commandLine, options, rpcHook);

所有的command都实现了SubCommand接口。

public class TopicListSubCommand implements SubCommand {}

SubCommand提供了几个接口方法

public interface SubCommand {
    String commandName(); //命令名称,用户用这个名称调用这个命令,不区分大小写,比如topiclist。

    String commandDesc(); //命令描述

    Options buildCommandlineOptions(final Options options); //参数构成

    void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;//命令执行
}
b. TopicListSubCommand.execute
public void execute(final CommandLine commandLine, final Options options,
        RPCHook rpcHook) throws SubCommandException {
        ...

        try {
            ...
            defaultMQAdminExt.start();
            if (commandLine.hasOption('c')) {
                ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();

                ...
                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
                ...
            } else {
                ...
                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
                for (String topic : topicList.getTopicList()) {
                    System.out.printf("%s%n", topic);
                }
            }
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
        } finally {
            defaultMQAdminExt.shutdown();
        }
    }

这个类主要就是根据传入的参数是否有-c 或者-n 获取相应的topic,然后打印出来。

以-n为例,它的逻辑就是去namesrv获取所有的topic,然后打印出来。


     本文转自rongwei84n 51CTO博客,原文链接:http://blog.51cto.com/483181/2043859,如需转载请自行联系原作者


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
源码分析 RocketMQ DLedger 多副本之 Leader 选主
本文将按照《RocketMQ 多副本前置篇:初探raft协议》的思路来学习RocketMQ选主逻辑。首先先回顾一下关于Leader的一些思考: 节点状态需要引入3种节点状态:Follower(跟随者)、Candidate(候选者),该状态下的节点会发起投票请求,Leader(主节点)。
2204 0
源码阅读技巧篇:RocketMQ DLedger 多副本即主从切换专栏回顾
RocketMQ DLedger 多副本即主从切换专栏总共包含9篇文章,时间跨度大概为2个月的时间,笔者觉得授人以鱼不如授人以渔,借以这个系列来展示该系列的创作始末,展示笔者阅读源码的技巧。 首先在下决心研读 RocketMQ DLedger 多副本(主从切换)的源码之前,首先还是要通过官方的分享、百度等途径对该功能进行一些基本的了解。
1459 0
阿里云服务器ECS远程登录用户名密码查询方法
阿里云服务器ECS远程连接登录输入用户名和密码,阿里云没有默认密码,如果购买时没设置需要先重置实例密码,Windows用户名是administrator,Linux账号是root,阿小云来详细说下阿里云服务器远程登录连接用户名和密码查询方法
11065 0
RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,回调TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务。
2775 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
11782 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
12474 0
Rocketmq源码分析(一)整体架构
1 系列 整体架构图 producer端发送消息 broker端接收消息 broker端消息的存储 consumer消费消息 分布式事务的实现 定时消息的实现 关于顺序消费 关于重复消息 关于高可用 2 整体架构图 先来看下官方给出
14131 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
6782 0
21119
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载