🍃前言
本次开发目标,实现消息持久化
🍀消息存储格式设计
在前面最开始博主的设计就为将消息存储在硬盘上,那么我们应该以怎样的格式存储在硬盘上面呢?
我们的消息是需要依附于队列的,因此我们在存储的时候,可以将消息按照队列的维度展开。
在前面数据创建的时候,我们有了一个data目录(meta.db就在这个目录中)
我们可以在data中创建一些子目录,子目录的名字就是队列名。
然后我们再每个队列的子目录下,再分配两个文件,来存储消息
- 第一个文件:queue_data这里保存消息的内容
- 第二个文件:queue_stat 这里用来保存消息的统计信息(具体统计什么信息,后买你会说到)
🚩queue_data文件设计
关于queue_data.txt文件的设计
我是这样设计的,由于queue_data文件是一个二进制格式的文件。
所以我做出一下约定,这个文件包含若干个消息,每个消息都已二进制的方式进行存储。那么每个消息约定有以下几部分组成
而相应的二进制数据模块又由以下几部分组成
起初在设计消息类的时候,还涉及两个元素offsetBeg,offsetEnd,没有让它们序列化
此时呢,这两个属性就不会跟着Message进入硬盘中。而是在内存中进行存储,方便随时找到内存中的Message对象,就能找到对应的Message对象了
如此以来我们的queue_data文件设计就完成了
🚩queue_stat文件设计
那么queue_stat文件是用来干嘛的呢?
再上面的queue_data文件中有一个属性叫isValid,代表的是 是否持久化,也就是需要进行删除
对于Broker Server来说消息是需要新增,也需要删除的.
生产者生产-一个消息过来,就得新增这个消息
消费者把这个消息消费掉,这个消息就得删除.
新增和删除,对于内存中来说,好办 (直接使用一一些集合类)
但是在文件。上就麻烦了。新增消息可以直接把新的消息追加到文件末尾
删除消息不好搞文件可以视为是一个"顺序表"这样的结构如果直接
删除中间元素,就需要涉及到类似于"顺序表搬运",这样的操作,效率是非常低的
因此,使用这种搬运的方式删除,是不合适的
因此我们使用逻辑删除,是比较合适的
- isValid为1,有效
- isValid为0,无效
但是呢,随着时间的推移,这些文件可能越来越大,并且可能大部分都是无效消息,针对这种情况,就需要考虑对当前文件进行垃圾回收
关于垃圾回收,博主这里使用的复制算法
我们只需要直接遍历原有的消息数据文件,把所有的有效的数据拷贝到一个新的文件中,再把之前的整个旧文件都删除就好了
那我们什么时候执行垃圾回收呢?
此处我们这里做出这样的约定,当总数目超过2000(这个可以随意定义),并且有效消息的数目低于总消息数目的50%(随意定义),就触发垃圾回收
如此以来,我们queue_stat文件的作用,就体现出来,
- 用来记录总消息数目与有效消息数目
queue_stat文件定义为文本格式,只存一行数据,一行有两列:
- 第一列 是总的消息数目
- 第二列 是有效消息数目
- 两者之间使用 \t 分割
- 形如2000\t1888
如此以来,queue_stat文件也就此完成了
🚩拓展
如果整个队列中,消息特别特别多,而且都是有效消息
此时就会导致整个消息的数据库文件特别大,后续针对这个文件的操作,成本也会上移
假如这个文件非常大(10G),触发一次GC,整体耗时就会非常高
虽然博主这里没有解决该问题,但是可以提供以下思路
- 需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少.
- 设计策略,什么时候触发文件的拆分.什么时候触发文件的合并
🎄实现统计文件(queue_stat)的读写
统计文件读写来说相对较为简单,所以这里就不进行讲解了,相关注解已包含再代码中,实现代码如下:
// 定义一个内部类, 来表示该队列的统计信息 // 有限考虑使用 static, 静态内部类. static public class Stat { // 此处直接定义成 public, 就不再搞 get set 方法了. // 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了. public int totalCount; // 总消息数量 public int validCount; // 有效消息数量 } // 预定消息文件所在的目录和文件名 // 这个方法, 用来获取到指定队列对应的消息文件所在路 private String getQueueDir(String queuename) { return ".data" + queuename; } // 这个方法用来获取该队列的消息数据文件路径 // 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改. // .bin / .dat private String getQueueDataPath(String queueName) { return getQueueDir(queueName) + "/queue_data.txt"; } // 这个方法用来获取该队列的消息统计文件路径 private String getQueueStatPath(String queueName) { return getQueueDir(queueName) + "/queue_stat.txt"; } private Stat readStat(String queuename) { // 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容 Stat stat = new Stat(); try(InputStream inputStream = new FileInputStream(getQueueStatPath(queuename))) { Scanner scanner = new Scanner(inputStream); stat.totalCount = scanner.nextInt(); stat.validCount = scanner.nextInt(); return stat; } catch (IOException e) { e.printStackTrace(); } return null; } private void writeStat(String queueName, Stat stat) { // 使用 PrintWrite 来写文件. // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的. try(OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) { PrintWriter printWriter = new PrintWriter(outputStream); printWriter.print(stat.totalCount + "/t" + stat.validCount); printWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } // 创建队列对应的文件和目录 public void createQueueFiles(String queueName) throws IOException { // 1. 先创建队列对应的消息目录 File baseDir = new File(getQueueDir(queueName)); if (!baseDir.exists()) { // 不存在, 就创建这个目录 boolean ok = baseDir.mkdirs(); if (!ok) { throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath()); } } // 2. 创建队列数据文件 File queueDataFile = new File(getQueueDataPath(queueName)); if (!queueDataFile.exists()) { boolean ok = queueDataFile.createNewFile(); if (!ok) { throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath()); } } // 3. 创建消息统计文件 File queueStatFile = new File(getQueueStatPath(queueName)); if (!queueStatFile.exists()) { boolean ok = queueStatFile.createNewFile(); if (!ok) { throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath()); } } // 4. 给消息统计文件, 设定初始值. 0\t0 Stat stat = new Stat(); stat.totalCount = 0; stat.validCount = 0; writeStat(queueName, stat); }
除此之外呢,博主还提供了两个方法,作用分别为
- 删除队列的文件和目录
- 检查队列的目录和文件是否存在.
实现如下:
public void destroyQueueFiles(String queueName) throws IOException { // 先删除里面的文件, 再删除目录. File queueDataFile = new File(getQueueDataPath(queueName)); boolean ok1 = queueDataFile.delete(); File queueStatFile = new File(getQueueStatPath(queueName)); boolean ok2 = queueStatFile.delete(); File baseDir = new File(getQueueDir(queueName)); boolean ok3 = baseDir.delete(); if (!ok1 || !ok2 || !ok3) { // 有任意一个删除失败, 都算整体删除失败. throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath()); } } // 检查队列的目录和文件是否存在. // 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化) public boolean checkFilesExits(String queueName) { // 判定队列的数据文件和统计文件是否都存在!! File queueDataFile = new File(getQueueDataPath(queueName)); if (!queueDataFile.exists()) { return false; } File queueStatFile = new File(getQueueStatPath(queueName)); if (!queueStatFile.exists()) { return false; } return true; }
⭕总结
关于《【消息队列开发】 实现消息持久化》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下