RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

简介: RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

image.png

RocketMQ使用教程相关系列 目录


目录

源代码说明

MessageExt的源码解析

继承于Message,Message源码解析

喝点毒鸡汤


源代码说明

MessageExt位于此包下:

package org.apache.rocketmq.common.message;

MessageExt的源码解析

参数说明在代码里

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;
    //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
    private int queueId;
    //记录消息在Broker存盘大小
    private int storeSize;
    //记录在ConsumeQueue中的偏移
    private long queueOffset;
    //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
    private int sysFlag;
    //消息创建时间,在Producer发送消息时设置
    private long bornTimestamp;
    //记录发送该消息的producer地址
    private SocketAddress bornHost;
    //消息存储时间
    private long storeTimestamp;
    //记录存储该消息的Broker地址
    private SocketAddress storeHost;
    //消息Id
    private String msgId;
    //记录消息在Broker中存储偏移
    private long commitLogOffset;
    //消息内容CRC校验值
    private int bodyCRC;
    //消息重试消费次数
    private int reconsumeTimes;
    //这个参数没看懂,知道的大佬分享下
    private long preparedTransactionOffset;
//省略get set
    @Override
    public String toString() {
        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

继承于Message,Message源码解析

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}1.public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}

喝点毒鸡汤

每天读点源码,进步一小步,成长一大步。

目录
打赏
0
0
0
0
29
分享
相关文章
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
180 4
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
166 3
RocketMQ 之 IoT 消息解析:物联网需要的消息技术
RocketMQ 5.0 是为应对物联网(IoT)场景而发布的云原生消息中间件,旨在解决 IoT 中大规模设备连接、数据处理和边缘计算的需求。
1289 66
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
186 0
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
364 2
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
1028 4
kde
|
5天前
|
Docker镜像加速指南:手把手教你配置国内镜像源
配置国内镜像源可大幅提升 Docker 拉取速度,解决访问 Docker Hub 缓慢问题。本文详解 Linux、Docker Desktop 配置方法,并提供测速对比与常见问题解答,附最新可用镜像源列表,助力高效开发部署。
kde
2920 7
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
544 0

推荐镜像

更多
  • DNS
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等

    登录插画

    登录以查看您的控制台资源

    管理云资源
    状态一览
    快捷访问