RocketMQ5.0中的mqadmin在创建延时消息的主题时不支持官方文档中说的添加额外属性指定主题类型。大家有遇到过么?怎么解决的呢?
在RocketMQ 5.0中,确实存在无法通过mqadmin添加额外属性指定主题类型的问题。这是因为在5.0版本中,延时消息的主题类型默认为DELAY
,无法通过mqadmin命令行工具进行修改。
如果您需要创建其他类型的主题(如NORMAL
),可以通过编程方式使用RocketMQ提供的Java SDK来创建主题,并在创建主题时指定类型。
以下是使用Java SDK创建主题并指定类型的示例代码:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.admin.TopicStatus;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
public class CreateTopicExample {
public static void main(String[] args) throws RemotingException, MQClientException, InterruptedException {
// 创建MQAdminExt对象
MQAdminExt admin = new DefaultMQAdminExt();
admin.start();
// 创建主题
String topic = "your_topic"; // 替换成您的主题名称
String brokerAddr = "127.0.0.1:9876"; // 替换成您的Broker地址
int queueNum = 4; // 主题队列数
int topicType = TopicStatus.NORMAL_TOPIC; // 主题类型,NORMAL_TOPIC为普通类型
try {
// 创建主题
admin.createAndUpdateTopicConfig(brokerAddr, topic, queueNum, topicType);
// 更新主题配置
UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
cmd.setAdminTool(admin);
cmd.execute(new String[]{"-b", brokerAddr, "-t", topic});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭MQAdminExt对象
admin.shutdown();
}
}
}
上述代码示例中,通过创建DefaultMQAdminExt
对象来与RocketMQ进行交互。首先使用createAndUpdateTopicConfig
方法创建主题,并指定主题类型为NORMAL_TOPIC
。然后使用UpdateTopicSubCommand
类的execute
方法更新主题配置。
您需要将代码中的brokerAddr
和topic
替换为您实际使用的Broker地址和主题名称。
通过编程方式创建主题并指定类型可以解决mqadmin无法添加额外属性指定主题类型的问题。但是需要注意,这种方式需要您具备Java编程的基础,并且需要在您的项目中引入RocketMQ的Java SDK。
消息仅支持在MessageType为Delay的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
使用示例
和普通消息相比,定时消费发送时,必须设置定时触发的目标时间戳。
以Java语言为例,使用定时消息示例参考如下:
MessageBuilder messageBuilder = new MessageBuilder();
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/