RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

image.png

RocketMQ使用教程相关系列 目录


目录

源代码说明

MessageExt的源码解析

继承于Message,Message源码解析

喝点毒鸡汤


源代码说明

MessageExt位于此包下:

package org.apache.rocketmq.common.message;

MessageExt的源码解析

参数说明在代码里

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;
    //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
    private int queueId;
    //记录消息在Broker存盘大小
    private int storeSize;
    //记录在ConsumeQueue中的偏移
    private long queueOffset;
    //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
    private int sysFlag;
    //消息创建时间,在Producer发送消息时设置
    private long bornTimestamp;
    //记录发送该消息的producer地址
    private SocketAddress bornHost;
    //消息存储时间
    private long storeTimestamp;
    //记录存储该消息的Broker地址
    private SocketAddress storeHost;
    //消息Id
    private String msgId;
    //记录消息在Broker中存储偏移
    private long commitLogOffset;
    //消息内容CRC校验值
    private int bodyCRC;
    //消息重试消费次数
    private int reconsumeTimes;
    //这个参数没看懂,知道的大佬分享下
    private long preparedTransactionOffset;
//省略get set
    @Override
    public String toString() {
        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

继承于Message,Message源码解析

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}1.public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}

喝点毒鸡汤

每天读点源码,进步一小步,成长一大步。

目录
相关文章
|
5月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
107 4
|
3月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
103 12
|
7月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
72 3
|
3月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
61 2
|
8月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
63 0
|
4月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
162 0
RocketMQ消息重试机制解析!
|
5月前
|
网络协议 网络虚拟化
接收网络包的过程——从硬件网卡解析到IP
【9月更文挑战第18天】这段内容详细描述了网络包接收过程中机制。当网络包触发中断后,内核处理完这批网络包,会进入主动轮询模式,持续处理后续到来的包,直至处理间隙返回其他任务,从而减少中断次数,提高处理效率。此机制涉及网卡驱动初始化时注册轮询函数,通过软中断触发后续处理,并逐步深入内核网络协议栈,最终到达TCP层。整个接收流程分为多个层次,包括DMA技术存入Ring Buffer、中断通知CPU、软中断处理、以及进入内核网络协议栈等多个步骤。
|
4月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
72 0
EMQ
|
8月前
|
安全 开发工具 数据安全/隐私保护
MQTT 5.0 报文解析 06:AUTH
MQTT 5.0 引入了增强认证特性,它使 MQTT 除了简单密码认证和 Token 认证以外,还能够支持质询/响应风格的认证。为了实现这一点,它在原先 CONNECT 和 CONNACK 报文的基础上,又引入了 AUTH 报文来实现任意多次的认证数据交换,以支持各种不同类型的认证机制,例如 SCRAM、Kerberos 认证等等。
EMQ
354 14
MQTT 5.0 报文解析 06:AUTH

推荐镜像

更多