开发者社区 > 云原生 > 正文

我无法在java代码中创建带有clusterName参数的主题。

代码如下:

public static void main(String[] args) { String nameAddress = "192.168.29.133:9876"; List topicModels = null; try { MQAdminExt mqAdminExt = getMqAdminExt(nameAddress); // createTopic("192.168.29.133:9876", "testTopic1"); //// TopicModel topicModel = getTopicInfo("192.168.29.133:9876", "testTopic1", mqAdminExt); //// System.out.println(topicModel); ClusterInfo clusterInfo = getClusterInfo(mqAdminExt); clusterInfo.getClusterAddrTable().values().forEach(clusterNames->clusterNames.forEach(clusterName-> { try { createTopic(nameAddress, "testTopic1", clusterName); } catch (SubCommandException e) { e.printStackTrace(); } })); // clusterInfo.getBrokerAddrTable().values().forEach(brokerData -> { // brokerData.getBrokerAddrs().values().forEach(broker->{ // try { // createTopic(nameAddress, "testTopic1", broker); // } catch (SubCommandException e) { // e.printStackTrace(); // } // }); // }); TopicModel topicModel = getTopicInfo(nameAddress, "testTopic1", mqAdminExt); System.out.println(topicModel); closeMqAdminExt(mqAdminExt); } catch (Exception e) { e.printStackTrace(); }

}

public static void createTopic(String namesrvAddr, String topicName, String clusterName) throws SubCommandException { UpdateTopicSubCommand updateTopicSubCommand = new UpdateTopicSubCommand(); Options options = new Options(); options.addOption("n", "namesrvAddr", true, ""); CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + updateTopicSubCommand.commandName(), createTopicArgs(namesrvAddr, topicName, clusterName), updateTopicSubCommand.buildCommandlineOptions(options), new PosixParser()); updateTopicSubCommand.execute(commandLine, options, null); }

public static String[] createTopicArgs(String namesrvAddr, String topicName, String clusterName){
    List<String> args = new ArrayList<>();
    args.add("-n "+namesrvAddr);
    args.add("-t "+topicName);
    args.add("-r 2");
    args.add("-w 2");
    args.add("-c "+clusterName);
    String[] strArr = new String[args.size()];
    return args.toArray(strArr);
}

我无法使用这些代码创建主题。它sade:

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181) at test.queue.RocketMqUtil.createTopic(RocketMqUtil.java:160) at test.queue.RocketMqUtil.lambda$null$0(RocketMqUtil.java:46) at java.lang.Iterable.forEach(Iterable.java:75) at test.queue.RocketMqUtil.lambda$main$1(RocketMqUtil.java:44) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.main(RocketMqUtil.java:44) Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:393) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222) at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83) at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154) ... 6 more org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181) at test.queue.RocketMqUtil.createTopic(RocketMqUtil.java:160) at test.queue.RocketMqUtil.lambda$null$2(RocketMqUtil.java:54) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.lambda$main$3(RocketMqUtil.java:52) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.main(RocketMqUtil.java:51) Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:393) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222) at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83) at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154) ... 6 more TopicModel{topicName='testTopic1', queueNums=2, queues=[Queue{queueId=0, minOffset=0, maxOffset=0, lastUpdateTimestamp=0}, Queue{queueId=1, minOffset=0, maxOffset=0, lastUpdateTimestamp=0}]} 10:22:22.374 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.29.133:10909] result: true 10:22:22.375 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.29.133:9876] result: true

Process finished with exit code 0

我的问题是:

else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();

            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                System.out.printf("create topic to %s success.%n", addr);
            }

rocket-tools 代码在这里,它总是抛出异常。我认为没有给出名称ServerAddress。

顺便说一句,当我尝试用brokeAddress创建主题时,它是成功的。

原提问者GitHub用户denglifeng3

展开
收起
芬奇福贵 2023-05-26 10:56:35 125 0
1 条回答
写回答
取消 提交回答
  • 参考:https://github.com/apache/rocketmq-externals/blob/7875f0abfc30a63ec94a32ee013305a35dda77b6/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java#L76

    原回答者GitHub用户francisoliverlee

    2023-05-26 17:19:28
    赞同 展开评论 打赏

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
The Reactive Cloud Native Arch 立即下载
JAVA开发手册1.5.0 立即下载