RocketMQ的消息查询

简介: RocketMQ的消息查询

按照MessageId查询消息
image.png
MsgId 总共 16 字节,包含消息存储主机地址(ip/port),消息 Commit Log offset。从 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,然后按照存储格式所在位置将消息 buffer 解析成一个完整的消息。
在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:
VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor,使用请求中的 commitLog offset和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

按照Message Key查询消息
“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:
image.png
1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目, 例如图中所示 slotNum=5000000)。
2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)。
3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
4.Hash 冲突;
第一种,key 的 hash 值不同但模数相同,此时查询的时候会再比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。
第二种,hash 值相等但 key 不等, 出于性能的考虑冲突的检测放到客户端处理(key 的原始值是存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。
5.存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中),整个索引文件是定长的,结构也是固定的。

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 
import org.apache.rocketmq.client.exception.MQBrokerException; 
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.remoting.exception.RemotingException; 

public class QueryingMessageDemo { 
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { 
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_grp_09_01"); 
        
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.start(); 
        
        MessageExt message = consumer.viewMessage("tp_demo_08", "0A4E00A7178878308DB150A780BB0000"); 
        System.out.println(message); 
        System.out.println(message.getMsgId()); 
        consumer.shutdown(); 
    } 
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 RocketMQ
rocketMq错误日志所在位置
rocketMq错误日志所在位置
303 0
|
SpringCloudAlibaba Java API
SpringCloudAliBaba篇之gateway:手把手教你搭建服务网关(下)
SpringCloudAliBaba篇之gateway:手把手教你搭建服务网关(下)
664 0
vscode 打开多个标签页
vscode 打开多个标签页
679 0
|
消息中间件 存储 Java
自顶向下学习 RocketMQ(九):回溯消费
回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。
自顶向下学习 RocketMQ(九):回溯消费
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
2756 1
|
前端开发 JavaScript 安全
入门Vue+.NET 8 Web Api记录(一)
入门Vue+.NET 8 Web Api记录(一)
555 5
|
数据采集 Web App开发 测试技术
如何避免反爬虫程序检测到爬虫行为?
这段内容介绍了几种避免被反爬虫程序检测的方法:通过调整请求频率并遵循网站规则来模拟自然访问;通过设置合理的User-Agent和其他请求头信息来伪装请求;利用代理IP和分布式架构来管理IP地址;以及采用Selenium等工具模拟人类的浏览行为,如随机点击和滚动页面,使爬虫行为更加逼真。这些技巧有助于降低被目标网站识别的风险。
|
缓存 UED
强缓存与协商缓存
强缓存与协商缓存
370 63