请教一下rocketmq ,这个topic划分消息类型 , 建立topic时候怎么设置类型呢 . 翻了一圈貌似没看到有说明怎么设置?
用5.0 gRPC客户端要设置一下
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name>
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION,此回答整理自钉群“群2-Apache RocketMQ 中国开发者钉钉群”
RocketMQ中的Topic是用来区分不同的消息类型的,可以根据业务需求来创建不同的Topic。在RocketMQ中,创建Topic时并不需要指定类型,因为RocketMQ并没有提供Topic类型的概念。
当消息发送到Broker时,Broker会根据Topic的名称来判断消息应该被发送到哪个队列中,从而实现消息的分发和消费。因此,可以根据不同的业务需求来创建不同的Topic,来实现消息的分类和管理。
在使用RocketMQ时,可以通过Producer发送消息到指定的Topic,而Consumer则可以订阅指定的Topic来接收消息。这样就可以实现不同业务之间的消息隔离,从而提高系统的可维护性和可扩展性。
RocketMQ的Topic可以用来划分不同类型的消息,方便针对不同的业务场景进行配置和管理。在创建Topic时可以通过配置来设置消息类型。
在RocketMQ中,Topic和Tag是相关联的,可以通过设置Tag来设置消息类型。可以将同一类型的消息设置相同的Tag,从而在消费时进行消息类型的筛选。
Topic和Tag的关系类似于数据库中的表和字段,Topic相当于表,Tag相当于字段。在RocketMQ中,使用Producer发送消息时需要指定Topic和Tag,使用Consumer接收消息时也需要指定Topic和Tag,从而确保正确收发同一类型的消息。
例如,创建一个名为"order"的Topic,并为其中两种不同的订单消息分别设置不同的Tag:
//创建名为"order"的Topic: $ bin/mqadmin updateTopic -n localhost:9876 -t order
//为"order" Topic的不同类型订单消息分别设置不同的Tag: $ bin/mqadmin updateTopic -n localhost:9876 -t order -m "0:order1;1:order2" 如上所示,通过"-m"参数可以设置不同Tag的消息类型,"0:order1"表示Tag为"order1"的消息属于第一种类型,"1:order2"表示Tag为"order2"的消息属于第二种类型。
在实际使用中,可以根据业务需求来设置不同的Topic和Tag,以方便进行消息类型的筛选和处理。
同时,在发送和消费消息时,需要指定Topic和Tag,示例代码如下:
//发送消息 Message message = new Message("order", "order1", "Hello RocketMQ".getBytes()); SendResult result = producer.send(message);
//消费消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order", "order1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt message : msgs) { System.out.println(new String(message.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); 可以看到,在发送和消费消息时,都需要指定Topic和Tag,这样才能确保发送和消费的是同一类型的消息。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/