RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

简介: RocketMQ源码解析:手把手教老婆看懂consumer接收到的MessageExt

image.png

RocketMQ使用教程相关系列 目录


目录

源代码说明

MessageExt的源码解析

继承于Message,Message源码解析

喝点毒鸡汤


源代码说明

MessageExt位于此包下:

package org.apache.rocketmq.common.message;

MessageExt的源码解析

参数说明在代码里

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;
    //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
    private int queueId;
    //记录消息在Broker存盘大小
    private int storeSize;
    //记录在ConsumeQueue中的偏移
    private long queueOffset;
    //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
    private int sysFlag;
    //消息创建时间,在Producer发送消息时设置
    private long bornTimestamp;
    //记录发送该消息的producer地址
    private SocketAddress bornHost;
    //消息存储时间
    private long storeTimestamp;
    //记录存储该消息的Broker地址
    private SocketAddress storeHost;
    //消息Id
    private String msgId;
    //记录消息在Broker中存储偏移
    private long commitLogOffset;
    //消息内容CRC校验值
    private int bodyCRC;
    //消息重试消费次数
    private int reconsumeTimes;
    //这个参数没看懂,知道的大佬分享下
    private long preparedTransactionOffset;
//省略get set
    @Override
    public String toString() {
        return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

继承于Message,Message源码解析

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}1.public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题
    private String topic;
    //网络通信层标记
    private int flag;
    /**
     *MIN_OFFSET:最小偏移
     * MAX_OFFSET:最大偏移
     * CONSUME_START_TIME:消费拉取时间
     *UNIQ_KEY:
     * CLUSTER:集群
     * WAIT:
     * TAGS:消息标签
     *DELAY:延时级别
     **/
    private Map<String, String> properties;
    //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
    private byte[] body;
    //事务消息相关的事务编号
    private String transactionId;
   //得到延时级别
    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }
        return 0;
    }
//设置延时级别
    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
//省略 get set
    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}

喝点毒鸡汤

每天读点源码,进步一小步,成长一大步。

目录
相关文章
|
3天前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
16 0
EMQ
|
18天前
|
运维 Linux 网络性能优化
MQTT 5.0 报文解析 05:DISCONNECT
在 MQTT 中,客户端和服务端可以在断开网络连接前向对端发送一个 DISCONNECT 报文,来指示连接关闭的原因。客户端发送的 DISCONNECT 报文还可以影响服务端在连接断开后的行为,例如是否发送遗嘱消息,是否更新会话过期间隔。
EMQ
26 0
MQTT 5.0 报文解析 05:DISCONNECT
EMQ
|
1月前
|
Linux 网络性能优化
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
在 MQTT 中,SUBSCRIBE 报文用于发起订阅请求,SUBACK 报文用于返回订阅结果。而 UNSUBSCRIBE 和 UNSUBACK 报文则在取消订阅时使用。相比于取消订阅,订阅操作更加常用。不过在本文中,我们仍然会一并介绍订阅与取消订阅报文的结构与组成。
EMQ
264 3
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
|
1天前
|
消息中间件 自然语言处理 负载均衡
RabbitMQ揭秘:轻量级消息队列的优缺点全解析
**RabbitMQ简介** RabbitMQ是源自电信行业的消息中间件,支持AMQP协议,提供轻量、快速且易于部署的解决方案。它拥有灵活的路由配置,广泛的语言支持,适用于异步处理、负载均衡、日志收集和微服务通信等场景。然而,当面临大量消息堆积或高吞吐量需求时,性能可能会下降,并且扩展和开发成本相对较高。
14 0
EMQ
|
12天前
|
安全 开发工具 数据安全/隐私保护
MQTT 5.0 报文解析 06:AUTH
MQTT 5.0 引入了增强认证特性,它使 MQTT 除了简单密码认证和 Token 认证以外,还能够支持质询/响应风格的认证。为了实现这一点,它在原先 CONNECT 和 CONNACK 报文的基础上,又引入了 AUTH 报文来实现任意多次的认证数据交换,以支持各种不同类型的认证机制,例如 SCRAM、Kerberos 认证等等。
EMQ
225 2
MQTT 5.0 报文解析 06:AUTH
|
26天前
|
消息中间件 小程序 网络性能优化
蓝易云 - 直播小程序源码有用的协议知识:MQTT协
在直播小程序源码中,MQTT协议可以用于实现实时消息推送,如弹幕、聊天消息、礼物信息等。通过使用MQTT协议,可以确保消息的实时性和可靠性,从而提高用户体验。
55 0
EMQ
|
26天前
|
开发工具
MQTT 5.0 报文解析 04:PINGREQ 与 PINGRESP
除了用于连接、发布和订阅的控制报文,MQTT 还有一类报文用于在客户端和服务端之间模拟心跳,以达到保持连接的目的,它们分别是 PINGREQ 报文和 PINGRESP 报文,我们通常也会称它们为心跳报文。
EMQ
148 0
MQTT 5.0 报文解析 04:PINGREQ 与 PINGRESP
|
1月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
EMQ
|
1月前
|
JSON Linux 网络性能优化
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
本文将介绍在 MQTT 中用于传递应用消息的 PUBLISH 报文以及它的响应报文。不管是客户端向服务端发布消息,还是服务端向订阅端转发消息,都需要使用 PUBLISH 报文。决定消息流向的主题、消息的实际内容和 QoS 等级,都包含在 PUBLISH 报文中。
EMQ
118 1
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
|
1月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。

推荐镜像

更多