EQueue文件持久化消息关键点设计思路

简介:

要持久化的关键数据有三种

  1. 消息;
  2. 队列,队列中存放的是消息索引信息,即消息在文件中的物理位置(messageOffset)和在队列中的逻辑位置(queueOffset)的映射信息;
  3. 队列消费进度,表示当前队列中的消息消费到第几个了;

发送消息的设计

  1. producer将消息的二进制数据发送到broker;
  2. broker做的事情:
    • 单线程持久化消息到内存映射文件;
    • 将当前消息的索引信息放入缓冲区,可以使用disruptor的ringbuffer实现,单线程写,无锁。
    • 单线程从缓冲区读取消息索引信息,并将索引信息写入内存映射文件;
    • 消息的内存映射文件、消息索引的内存映射文件都定时刷新到磁盘,比如每隔1s刷新一次,可配置;
  3. broker将当前消息的索引信息放入缓冲区后,就立即返回了,然后producer就收到了消息发送的结果;

其他说明:

  1. 因为不可能用一个文件来保存所有的消息,所以肯定是用多个文件的方式。也就是说,无论是保存消息还是保存消息索引,都用多个文件。另外,由于队列有多个,所以每个队列都对应多个内存映射文件。队列文件的目录命名规则:rootPath / topic / queueId / queue mapped files
  2. broker在将消息的索引信息放入缓冲区时,要检查缓冲区是否到达一定的水位,比如ringbuffer总大小100W个槽,假如水位是80%,那就是当现在ringbuffer中可用的槽不到20%时,应该要做流控,比如sleep 100s;理论上应该不会到达水位,因为写消息索引肯定比写消息本身要快;

消费消息的设计

  1. consumer告诉broker当前需要拉取哪个topic下的哪个队列里的第几个位置(queueOffset)开始的消息,并告诉要最多拉取多少个消息;
  2. broker根据topic和queueId找到对应的队列;
  3. 根据queueOffset从队列拿到消息在文件中的物理位置,即messageOffset;
  4. 根据messageOffset从消息的内存映射文件获取消息二进制数据;
  5. 将消息二进制数据写入临时的内存流里,该内存流里包含了所有要返回的消息;
  6. 消息拉取数量达到要求或没有新的消息可以拉取后,将内存流对应的二进制数据返回给consumer;
  7. consumer解析二进制数据,得到所有的消息对象;

broker定时清理过期的消息和消息索引

  1. 每隔10s扫描是否有过期的消息文件,过期时间可配置,比如三天;扫描时,发现文件的最后修改时间是3天前,则删除;
  2. 每隔10s扫描是否有过期的消息索引文件,判断是否过期的依据是扫描每个消息索引文件,判断该文件中的最后一个消息索引的messageOffset是否比最小的messageOffset还要小;如果小,就说明这个消息索引文件已经无意义了,可以删除;

broker启动时的逻辑

  1. 扫描磁盘上所有的消息的存储文件,为每个文件建立内存映射;
  2. 扫描磁盘上所有的队列(消息索引)的存储文件,为每个文件建立内存映射;
  3. 对每个队列,预恢复几个文件(比如最后的3个文件)的数据到内存,剩余的用到时再恢复;
  4. 同理,对于存储消息的文件,也预恢复几个(比如最后的3个文件)到内存;一般大部分消息者只要消费进度不是太慢,总是应该已经赶上了最后那三个文件了;
  5. 关于异常关闭broker时的逻辑,暂时还没想清楚,还需要再细思;
目录
相关文章
|
1月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
69 12
|
2月前
|
消息中间件 存储 分布式计算
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
26 1
|
7月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
108 0
|
存储 Kubernetes 应用服务中间件
应用存储和持久化数据卷:核心知识(二)|学习笔记
快速学习应用存储和持久化数据卷:核心知识(二)
127 0
应用存储和持久化数据卷:核心知识(二)|学习笔记
|
运维 Java 数据库
如何实现最终一致性,有哪些解决方案
如何实现最终一致性,有哪些解决方案
|
存储 XML Java
何为消息持久化?
持久化(Persistence),即把数据(如内存中的对象)保存到可永久保存的存储设备中(如磁盘)。持久化的主要应用是将内存中的对象存储在关系型的数据库中,当然也可以存储在磁盘文件中、XML数据文件中等等。
125 0
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
429 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
|
消息中间件 存储 缓存
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
718 1
5张图带你理解 RocketMQ 顺序消息实现机制
GoFrame如何实现顺序性校验
这篇文章填上之前留的坑,我们以map校验举例:
117 0