目录
源代码说明
MessageExt位于此包下:
package org.apache.rocketmq.common.message;
MessageExt的源码解析
参数说明在代码里
public class MessageExt extends Message { private static final long serialVersionUID = 5720810158625748049L; //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取 private int queueId; //记录消息在Broker存盘大小 private int storeSize; //记录在ConsumeQueue中的偏移 private long queueOffset; //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识 private int sysFlag; //消息创建时间,在Producer发送消息时设置 private long bornTimestamp; //记录发送该消息的producer地址 private SocketAddress bornHost; //消息存储时间 private long storeTimestamp; //记录存储该消息的Broker地址 private SocketAddress storeHost; //消息Id private String msgId; //记录消息在Broker中存储偏移 private long commitLogOffset; //消息内容CRC校验值 private int bodyCRC; //消息重试消费次数 private int reconsumeTimes; //这个参数没看懂,知道的大佬分享下 private long preparedTransactionOffset; //省略get set @Override public String toString() { return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes=" + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset + ", toString()=" + super.toString() + "]"; } }
继承于Message,Message源码解析
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题 private String topic; //网络通信层标记 private int flag; /** *MIN_OFFSET:最小偏移 * MAX_OFFSET:最大偏移 * CONSUME_START_TIME:消费拉取时间 *UNIQ_KEY: * CLUSTER:集群 * WAIT: * TAGS:消息标签 *DELAY:延时级别 **/ private Map<String, String> properties; //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。 private byte[] body; //事务消息相关的事务编号 private String transactionId; //得到延时级别 public int getDelayTimeLevel() { String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (t != null) { return Integer.parseInt(t); } return 0; } //设置延时级别 public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); } //省略 get set @Override public String toString() { return "Message{" + "topic='" + topic + '\'' + ", flag=" + flag + ", properties=" + properties + ", body=" + Arrays.toString(body) + ", transactionId='" + transactionId + '\'' + '}'; } }1.public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题 private String topic; //网络通信层标记 private int flag; /** *MIN_OFFSET:最小偏移 * MAX_OFFSET:最大偏移 * CONSUME_START_TIME:消费拉取时间 *UNIQ_KEY: * CLUSTER:集群 * WAIT: * TAGS:消息标签 *DELAY:延时级别 **/ private Map<String, String> properties; //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。 private byte[] body; //事务消息相关的事务编号 private String transactionId; //得到延时级别 public int getDelayTimeLevel() { String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (t != null) { return Integer.parseInt(t); } return 0; } //设置延时级别 public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); } //省略 get set @Override public String toString() { return "Message{" + "topic='" + topic + '\'' + ", flag=" + flag + ", properties=" + properties + ", body=" + Arrays.toString(body) + ", transactionId='" + transactionId + '\'' + '}'; } }
喝点毒鸡汤
每天读点源码,进步一小步,成长一大步。