代码如下:
public static void main(String[] args) { String nameAddress = "192.168.29.133:9876"; List topicModels = null; try { MQAdminExt mqAdminExt = getMqAdminExt(nameAddress); // createTopic("192.168.29.133:9876", "testTopic1"); //// TopicModel topicModel = getTopicInfo("192.168.29.133:9876", "testTopic1", mqAdminExt); //// System.out.println(topicModel); ClusterInfo clusterInfo = getClusterInfo(mqAdminExt); clusterInfo.getClusterAddrTable().values().forEach(clusterNames->clusterNames.forEach(clusterName-> { try { createTopic(nameAddress, "testTopic1", clusterName); } catch (SubCommandException e) { e.printStackTrace(); } })); // clusterInfo.getBrokerAddrTable().values().forEach(brokerData -> { // brokerData.getBrokerAddrs().values().forEach(broker->{ // try { // createTopic(nameAddress, "testTopic1", broker); // } catch (SubCommandException e) { // e.printStackTrace(); // } // }); // }); TopicModel topicModel = getTopicInfo(nameAddress, "testTopic1", mqAdminExt); System.out.println(topicModel); closeMqAdminExt(mqAdminExt); } catch (Exception e) { e.printStackTrace(); }
}
public static void createTopic(String namesrvAddr, String topicName, String clusterName) throws SubCommandException { UpdateTopicSubCommand updateTopicSubCommand = new UpdateTopicSubCommand(); Options options = new Options(); options.addOption("n", "namesrvAddr", true, ""); CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + updateTopicSubCommand.commandName(), createTopicArgs(namesrvAddr, topicName, clusterName), updateTopicSubCommand.buildCommandlineOptions(options), new PosixParser()); updateTopicSubCommand.execute(commandLine, options, null); }
public static String[] createTopicArgs(String namesrvAddr, String topicName, String clusterName){
List<String> args = new ArrayList<>();
args.add("-n "+namesrvAddr);
args.add("-t "+topicName);
args.add("-r 2");
args.add("-w 2");
args.add("-c "+clusterName);
String[] strArr = new String[args.size()];
return args.toArray(strArr);
}
我无法使用这些代码创建主题。它sade:
org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181) at test.queue.RocketMqUtil.createTopic(RocketMqUtil.java:160) at test.queue.RocketMqUtil.lambda$null$0(RocketMqUtil.java:46) at java.lang.Iterable.forEach(Iterable.java:75) at test.queue.RocketMqUtil.lambda$main$1(RocketMqUtil.java:44) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.main(RocketMqUtil.java:44) Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:393) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222) at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83) at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154) ... 6 more org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181) at test.queue.RocketMqUtil.createTopic(RocketMqUtil.java:160) at test.queue.RocketMqUtil.lambda$null$2(RocketMqUtil.java:54) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.lambda$main$3(RocketMqUtil.java:52) at java.util.HashMap$Values.forEach(HashMap.java:981) at test.queue.RocketMqUtil.main(RocketMqUtil.java:51) Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:393) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222) at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83) at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154) ... 6 more TopicModel{topicName='testTopic1', queueNums=2, queues=[Queue{queueId=0, minOffset=0, maxOffset=0, lastUpdateTimestamp=0}, Queue{queueId=1, minOffset=0, maxOffset=0, lastUpdateTimestamp=0}]} 10:22:22.374 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.29.133:10909] result: true 10:22:22.375 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.29.133:9876] result: true
Process finished with exit code 0
我的问题是:
else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
rocket-tools 代码在这里,它总是抛出异常。我认为没有给出名称ServerAddress。
顺便说一句,当我尝试用brokeAddress创建主题时,它是成功的。
原提问者GitHub用户denglifeng3
参考:https://github.com/apache/rocketmq-externals/blob/7875f0abfc30a63ec94a32ee013305a35dda77b6/rocketmq-console/src/main/java/org/apache/rocketmq/console/controller/TopicController.java#L76
原回答者GitHub用户francisoliverlee
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。