针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。
网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。
自主搭建的RocketMQ
通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。
这里我们只摘取其中消费者的部分代码:
@Service @RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED) public class MqRegisteredListenerDemo implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class); @Override public void onMessage(String message) { log.info("received registered message: {}", message); } }
这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。
针对同一的topic,另外一个tag的消费者的实现如下:
@Service @RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY) public class MqModifyListenerDemo implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class); @Override public void onMessage(String message) { log.info("received modify message: {}", message); } }
我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。
关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。
基于云服务的RocketMQ
基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。
具体处理这里也只贴出部分代码:
@Configuration public class ConsumerClient { @Resource private MqConfigProperties mqConfigProperties; @Resource private EquipmentMessageListener equipmentMessageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); // 配置文件 Properties properties = mqConfigProperties.getMqProperties(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId()); // 将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); // 订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); // --------业务板块开始-------- Subscription subscription = new Subscription(); // 设置需要消费的消息所属的topic subscription.setTopic(MqConfigProperties.getInnerTopic()); // 设置需要消费的消息所属的tag subscription.setExpression(MqConfigProperties.getEquipmentMonitorTag()); // 实现MessageListener接口,并且在consume方法中实现消费逻辑 subscriptionTable.put(subscription, equipmentMessageListener); //订阅多个topic如上面设置 // --------业务板块结束-------- // 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }
在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。
往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。
那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。
如此一来,就可以监听一个topic下的不同tag了。
原理分析
两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。
这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。
这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?
那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。
如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。
如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。