开发者社区 问答 正文

消息过滤怎么进行?


本文描述 MQ 消费者如何根据 Tag 在 MQ 服务端完成消息过滤。
Tag,即消息标签、消息类型,用来区分某个 MQ 的 Topic 下的消息分类。MQ 允许消费者按照 Tag 对消息进行过滤,确保消费者最终只消费到他关心的消息类型。
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,比如订单创建消息(order)、支付消息(pay)、物流消息(logistics)。这些消息会发送到 Topic 为 Trade_Topic 的队列中,被各个不同的系统所接收,比如支付系统、物流系统、交易成功率分析系统、实时计算系统等。其中,物流系统只需接收物流类型的消息(logistics),而实时计算系统需要接收所有和交易相关(order、pay、logistics)的消息。

说明:针对消息归类,您可以选择创建多个 Topic, 或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,比如全集和子集的关系,流程先后的关系。

参考示例


发送消息
发送消息时,每条消息必须指明消息类型 Tag:

  1. Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());

消费方式-1
消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:
  1. consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
  2.     public Action consume(Message message, ConsumeContext context) {
  3.         System.out.println(message.getMsgID());
  4.         return Action.CommitMessage;
  5.     }
  6. });

消费方式-2
消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:
  1. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
  2.     public Action consume(Message message, ConsumeContext context) {
  3.         System.out.println(message.getMsgID());
  4.         return Action.CommitMessage;
  5.     }
  6. });

消费方式-3
消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:
  1. consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
  2.     public Action consume(Message message, ConsumeContext context) {
  3.         System.out.println(message.getMsgID());
  4.         return Action.CommitMessage;
  5.     }
  6. });

消费方式-4(错误示例)
同一个消费者多次订阅某 Topic 下的不同 Tag,后者会覆盖前者:
  1. //如下错误代码中,consumer只能接收到MQ_TOPIC下TagB的消息,而不能接收TagA的消息。
  2. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
  3.     public Action consume(Message message, ConsumeContext context) {
  4.         System.out.println(message.getMsgID());
  5.         return Action.CommitMessage;
  6.     }
  7. });
  8. consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
  9.     public Action consume(Message message, ConsumeContext context) {
  10.         System.out.println(message.getMsgID());
  11.         return Action.CommitMessage;
  12.     }
  13. });

展开
收起
猫饭先生 2017-10-26 13:52:40 2002 分享
分享
版权
举报
阿里云 AI 助理回答

在MQ(消息队列)场景中,消费者根据Tag进行消息过滤是实现精准消息订阅的关键机制。以下是如何利用Tag在MQ服务端完成消息过滤的详细说明:

Tag的基本概念与作用

  • 定义:Tag是附加在消息上的标签,用于对同一Topic下的消息进行二级分类。
  • 目的:确保消费者仅接收其关注的消息类型,提高消息处理的针对性和效率。

消息发送时设置Tag

  • 生产者在发送消息至指定Topic时,需为每条消息指定一个Tag。例如:
    Message msg = new Message("Trade_Topic", "logistics", "物流信息".getBytes());
    

    这里"logistics"即为消息的Tag,表示这是一条物流相关的消息。

消费者订阅消息的方式

订阅所有Tag

  • 若要订阅某Topic下所有类型的消息,使用特殊符号*
    consumer.subscribe("Trade_Topic", "*", new MessageListener() {
      // 处理逻辑
    });
    

订阅特定Tag

  • 对于只关心特定类型消息的消费者,明确指定Tag:
    consumer.subscribe("Trade_Topic", "pay", new MessageListener() {
      // 处理支付相关消息的逻辑
    });
    

订阅多个Tag

  • 需要接收多种类型消息时,使用||分隔不同的Tag:
    consumer.subscribe("Trade_Topic", "order||pay", new MessageListener() {
      // 处理订单和支付相关消息的逻辑
    });
    

注意事项

  • 订阅覆盖问题:同一个消费者多次订阅同一Topic的不同Tag时,后面的订阅会覆盖前面的设置。因此,如果需要同时消费多个Tag的消息,应一次性在订阅表达式中列出所有Tag。
  • 订阅关系一致性:在同一个消费者分组内,所有消费者的订阅关系包括Tag过滤规则必须保持一致,以避免消息消费异常。

应用场景示例

  • 在电商交易场景中,如上所述,通过为不同业务环节的消息分配不同Tag(如orderpaylogistics),并让对应的系统(如物流系统、实时计算系统)按需订阅这些Tag,可以高效地实现消息的定向传递和处理。

综上所述,Tag机制在MQ中扮演着消息分类与过滤的核心角色,它允许生产者灵活标记消息内容,并帮助消费者精确订阅感兴趣的消息类型,从而提升消息处理系统的整体效率和准确性。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等