RocketMQ的消息查询

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: RocketMQ的消息查询

按照MessageId查询消息
image.png
MsgId 总共 16 字节,包含消息存储主机地址(ip/port),消息 Commit Log offset。从 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,然后按照存储格式所在位置将消息 buffer 解析成一个完整的消息。
在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:
VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor,使用请求中的 commitLog offset和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

按照Message Key查询消息
“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:
image.png
1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目, 例如图中所示 slotNum=5000000)。
2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)。
3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
4.Hash 冲突;
第一种,key 的 hash 值不同但模数相同,此时查询的时候会再比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。
第二种,hash 值相等但 key 不等, 出于性能的考虑冲突的检测放到客户端处理(key 的原始值是存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。
5.存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中),整个索引文件是定长的,结构也是固定的。

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 
import org.apache.rocketmq.client.exception.MQBrokerException; 
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.remoting.exception.RemotingException; 

public class QueryingMessageDemo { 
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { 
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_grp_09_01"); 
        
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.start(); 
        
        MessageExt message = consumer.viewMessage("tp_demo_08", "0A4E00A7178878308DB150A780BB0000"); 
        System.out.println(message); 
        System.out.println(message.getMsgId()); 
        consumer.shutdown(); 
    } 
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Apache C语言
消息队列 MQ产品使用合集之在Cluster部署模式下,使用dashboard无法查询到消费组信息,一般是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
100 0
|
5月前
|
消息中间件 网络性能优化
消息队列 MQ产品使用合集之通过MQTT控制台查询不到设备轨迹或消息轨迹是什么原因
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
6月前
|
物联网 API 网络性能优化
MQTT常见问题之没有权限查询如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
消息中间件 物联网 RocketMQ
MQTT常见问题之RocketMQ到MQTT的消息轨迹查询失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
物联网 Serverless
MQTT常见问题之通过mqtt控制台查询不到设备轨迹如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
消息中间件 存储 开发者
MQ 消息查询|学习笔记
快速学习 MQ 消息查询
839 0
|
消息中间件 RocketMQ
rocketMq消息查询
概述     最近有人问我知道rocketMq是怎么查询消息的,我发现我貌似回答不上来,所以抽空就把这块内容补充一下,主要是讲清楚根据key查询消息和根据msgId查询消息两块内容。
1133 0
|
24天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
65 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
65 7