RocketMQ Tag 详解!

简介: 本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。

你好,我是猿java。

Tag 是 RocketMQ 提供的一种消息过滤机制,允许生产者在发送消息时指定一个或多个标签,消费者则可以根据这些标签来选择性地消费消息。这篇文章,我们将详细介绍 RocketMQ 中 Tag 的原理、源码分析以及示例。

Tag 的原理

在 RocketMQ 中,Tag 主要用于消息过滤。每个消息可以携带一个 Tag,消费者可以根据 Tag 来订阅特定的消息,从而实现消息的过滤和分类处理。

消息发送阶段

生产者在发送消息时,可以指定一个 Tag。这个 Tag 会被附加到消息的元数据中,并存储在 RocketMQ 的消息存储系统中。

消息存储阶段

消息被存储在 RocketMQ 的 Broker 中,消息的元数据(包括 Tag)也会被存储。

消息消费阶段

消费者在订阅消息时,可以指定要消费的 Tag。Broker 会根据消费者订阅的 Tag,将符合条件的消息投递给消费者。

源码分析

为了更好的理解 Tag的原理,我们通过 RocketMQ 中Tag 相关的几个主要代码片段进行演示。

生产者发送消息时的代码

// 创建消息实例,并指定Topic和Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 发送消息
SendResult sendResult = producer.send(msg);

Message 类中,Tag 是通过构造函数传递的,并存储在 Message 对象的 tags 字段中。

消费者订阅消息时的代码

// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

// 订阅Topic,并指定Tag
consumer.subscribe("TopicTest", "TagA");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
   
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
        for (MessageExt msg : msgs) {
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 启动消费者
consumer.start();

DefaultMQPushConsumer 类中,通过 subscribe 方法指定要订阅的 Topic 和 Tag,RocketMQ 内部会根据订阅的 Tag 进行消息过滤。

示例

下面是一个完整的示例,演示如何使用 RocketMQ 的 Tag 功能。

生产者代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 启动生产者
        producer.start();

        // 发送消息
        for (int i = 0; i < 10; i++) {
   
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

消费者代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅Topic,并指定Tag
        consumer.subscribe("TopicTest", "TagA");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
                for (MessageExt msg : msgs) {
   
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

尽管 RocketMQ 的 Tag 功能在消息过滤和分类处理方面提供了极大的便利,但也有其优缺点。下面详细分析一下:

优点

简单易用

Tag 的使用非常简单,生产者只需在发送消息时指定 Tag,消费者在订阅消息时指定相应的 Tag 即可。

高效过滤

通过 Tag 进行消息过滤,减少了消费者处理不相关消息的开销,从而提高了系统的性能。

灵活性高

支持一个 Topic 下多个 Tag,使得消息的分类和过滤更加灵活。

低延迟

Tag 过滤是在 Broker 端进行的,不会显著增加消息传递的延迟。

减少网络带宽

消费者只会接收到自己感兴趣的消息,减少了不必要的网络传输,从而节省了带宽。

缺点

单一维度过滤

Tag 只能提供单一维度的消息过滤,无法进行更复杂的多维度过滤。如果需要多维度过滤,需要结合其他机制(如消息属性)来实现。

有限的灵活性

Tag 的数量和种类在设计阶段需要规划好,灵活性有限。如果后期需要添加新的 Tag,可能需要重新设计和部署。

不支持复杂逻辑

Tag 过滤支持的逻辑较为简单,只能进行基于字符串匹配的过滤,无法支持复杂的过滤逻辑。

管理复杂性

随着系统规模的增大,Tag 的管理和维护可能变得复杂,尤其是在多个应用共享同一个 Topic 的情况下。

潜在的性能瓶颈

虽然 Tag 过滤在大多数场景下性能良好,但在极端情况下(如大量不同 Tag 的消息和高并发消费),可能会带来性能瓶颈。

适用场景

日志和监控

不同类型的日志和监控数据可以通过 Tag 进行分类和过滤。

电商系统

不同类型的订单、商品信息等可以通过 Tag 进行分类和过滤,消费者只处理自己感兴趣的消息。

金融系统

不同类型的交易、通知等可以通过 Tag 进行分类和过滤,提高系统的处理效率。

社交平台

不同类型的消息(如评论、点赞、私信等)可以通过 Tag 进行分类和过滤,提供更精准的消息推送。

总结

本文分析了 RocketMQ 的 Tag 功能,它在消息过滤和分类处理方面提供了极大的便利,适用于各种需要高效、低延迟消息传递的场景。然而,它也有一些局限性,如单一维度过滤、管理复杂性等。

在实际应用中,需要根据具体需求和系统设计,合理使用 Tag 功能,结合其他机制来实现更复杂的消息过滤和处理。

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

目录
相关文章
|
24天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
16天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
20天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2577 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
18天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
3天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
2天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
163 2
|
20天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1576 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
22天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
973 14
|
3天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
219 2
|
17天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
734 9