【消息队列开发】 实现消息持久化

简介: 【消息队列开发】 实现消息持久化

🍃前言

本次开发目标,实现消息持久化

🍀消息存储格式设计

在前面最开始博主的设计就为将消息存储在硬盘上,那么我们应该以怎样的格式存储在硬盘上面呢?

我们的消息是需要依附于队列的,因此我们在存储的时候,可以将消息按照队列的维度展开。

在前面数据创建的时候,我们有了一个data目录(meta.db就在这个目录中)

我们可以在data中创建一些子目录,子目录的名字就是队列名。

然后我们再每个队列的子目录下,再分配两个文件,来存储消息

  1. 第一个文件:queue_data这里保存消息的内容
  2. 第二个文件:queue_stat 这里用来保存消息的统计信息(具体统计什么信息,后买你会说到)

🚩queue_data文件设计

关于queue_data.txt文件的设计

我是这样设计的,由于queue_data文件是一个二进制格式的文件。

所以我做出一下约定,这个文件包含若干个消息,每个消息都已二进制的方式进行存储。那么每个消息约定有以下几部分组成

而相应的二进制数据模块又由以下几部分组成

起初在设计消息类的时候,还涉及两个元素offsetBeg,offsetEnd,没有让它们序列化

此时呢,这两个属性就不会跟着Message进入硬盘中。而是在内存中进行存储,方便随时找到内存中的Message对象,就能找到对应的Message对象了

如此以来我们的queue_data文件设计就完成了

🚩queue_stat文件设计

那么queue_stat文件是用来干嘛的呢?

再上面的queue_data文件中有一个属性叫isValid,代表的是 是否持久化,也就是需要进行删除

对于Broker Server来说消息是需要新增,也需要删除的.

生产者生产-一个消息过来,就得新增这个消息

消费者把这个消息消费掉,这个消息就得删除.

新增和删除,对于内存中来说,好办 (直接使用一一些集合类)

但是在文件。上就麻烦了。新增消息可以直接把新的消息追加到文件末尾

删除消息不好搞文件可以视为是一个"顺序表"这样的结构如果直接

删除中间元素,就需要涉及到类似于"顺序表搬运",这样的操作,效率是非常低的

因此,使用这种搬运的方式删除,是不合适的

因此我们使用逻辑删除,是比较合适的

  • isValid为1,有效
  • isValid为0,无效

但是呢,随着时间的推移,这些文件可能越来越大,并且可能大部分都是无效消息,针对这种情况,就需要考虑对当前文件进行垃圾回收

关于垃圾回收,博主这里使用的复制算法

我们只需要直接遍历原有的消息数据文件,把所有的有效的数据拷贝到一个新的文件中,再把之前的整个旧文件都删除就好了

那我们什么时候执行垃圾回收呢?

此处我们这里做出这样的约定,当总数目超过2000(这个可以随意定义),并且有效消息的数目低于总消息数目的50%(随意定义),就触发垃圾回收

如此以来,我们queue_stat文件的作用,就体现出来,

  • 用来记录总消息数目与有效消息数目

queue_stat文件定义为文本格式,只存一行数据,一行有两列:

  • 第一列 是总的消息数目
  • 第二列 是有效消息数目
  • 两者之间使用 \t 分割
  • 形如2000\t1888

如此以来,queue_stat文件也就此完成了

🚩拓展

如果整个队列中,消息特别特别多,而且都是有效消息

此时就会导致整个消息的数据库文件特别大,后续针对这个文件的操作,成本也会上移

假如这个文件非常大(10G),触发一次GC,整体耗时就会非常高

虽然博主这里没有解决该问题,但是可以提供以下思路

  1. 需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少.
  2. 设计策略,什么时候触发文件的拆分.什么时候触发文件的合并

🎄实现统计文件(queue_stat)的读写

统计文件读写来说相对较为简单,所以这里就不进行讲解了,相关注解已包含再代码中,实现代码如下:

// 定义一个内部类, 来表示该队列的统计信息
// 有限考虑使用 static, 静态内部类.
static public class Stat {
    // 此处直接定义成 public, 就不再搞 get set 方法了.
    // 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.
    public int totalCount;  // 总消息数量
    public int validCount;  // 有效消息数量
}
// 预定消息文件所在的目录和文件名
// 这个方法, 用来获取到指定队列对应的消息文件所在路
private String getQueueDir(String queuename) {
    return ".data" + queuename;
}
// 这个方法用来获取该队列的消息数据文件路径
// 注意, 二进制文件, 使用 txt 作为后缀, 不太合适. txt 一般表示文本. 此处咱们也就不改.
// .bin / .dat
private String getQueueDataPath(String queueName) {
    return getQueueDir(queueName) + "/queue_data.txt";
}
// 这个方法用来获取该队列的消息统计文件路径
private String getQueueStatPath(String queueName) {
    return getQueueDir(queueName) + "/queue_stat.txt";
}
private Stat readStat(String queuename) {
    // 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
    Stat stat = new Stat();
    try(InputStream inputStream = new FileInputStream(getQueueStatPath(queuename))) {
        Scanner scanner = new Scanner(inputStream);
        stat.totalCount = scanner.nextInt();
        stat.validCount = scanner.nextInt();
        return stat;
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}
private void writeStat(String queueName, Stat stat) {
    // 使用 PrintWrite 来写文件.
    // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
    try(OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
        PrintWriter printWriter = new PrintWriter(outputStream);
        printWriter.print(stat.totalCount + "/t" + stat.validCount);
        printWriter.flush();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {
    // 1. 先创建队列对应的消息目录
    File baseDir = new File(getQueueDir(queueName));
    if (!baseDir.exists()) {
        // 不存在, 就创建这个目录
        boolean ok = baseDir.mkdirs();
        if (!ok) {
            throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
        }
    }
    // 2. 创建队列数据文件
    File queueDataFile = new File(getQueueDataPath(queueName));
    if (!queueDataFile.exists()) {
        boolean ok = queueDataFile.createNewFile();
        if (!ok) {
            throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
        }
    }
    // 3. 创建消息统计文件
    File queueStatFile = new File(getQueueStatPath(queueName));
    if (!queueStatFile.exists()) {
        boolean ok = queueStatFile.createNewFile();
        if (!ok) {
            throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
        }
    }
    // 4. 给消息统计文件, 设定初始值. 0\t0
    Stat stat = new Stat();
    stat.totalCount = 0;
    stat.validCount = 0;
    writeStat(queueName, stat);
}

除此之外呢,博主还提供了两个方法,作用分别为

  • 删除队列的文件和目录
  • 检查队列的目录和文件是否存在.

实现如下:

public void destroyQueueFiles(String queueName) throws IOException {
    // 先删除里面的文件, 再删除目录.
    File queueDataFile = new File(getQueueDataPath(queueName));
    boolean ok1 = queueDataFile.delete();
    File queueStatFile = new File(getQueueStatPath(queueName));
    boolean ok2 = queueStatFile.delete();
    File baseDir = new File(getQueueDir(queueName));
    boolean ok3 = baseDir.delete();
    if (!ok1 || !ok2 || !ok3) {
        // 有任意一个删除失败, 都算整体删除失败.
        throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());
    }
}
// 检查队列的目录和文件是否存在.
// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
public boolean checkFilesExits(String queueName) {
    // 判定队列的数据文件和统计文件是否都存在!!
    File queueDataFile = new File(getQueueDataPath(queueName));
    if (!queueDataFile.exists()) {
        return false;
    }
    File queueStatFile = new File(getQueueStatPath(queueName));
    if (!queueStatFile.exists()) {
        return false;
    }
    return true;
}

⭕总结

关于《【消息队列开发】 实现消息持久化》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
7月前
|
消息中间件 Java 数据库
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
|
7月前
|
消息中间件 存储 测试技术
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
|
7月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
7月前
|
消息中间件 安全
【消息队列开发】 虚拟主机设计——操作绑定
【消息队列开发】 虚拟主机设计——操作绑定
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
31 2
|
7月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
5月前
|
消息中间件 存储 算法
现代消息队列与云存储问题之消息队列如何处理持久化
现代消息队列与云存储问题之消息队列如何处理持久化
|
7月前
|
消息中间件 Java
【消息队列开发】 实现消费者订阅消息
【消息队列开发】 实现消费者订阅消息
|
7月前
|
消息中间件 存储 Java
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
|
7月前
|
消息中间件
【消息队列开发】 实现Router类——交换机的转发规则
【消息队列开发】 实现Router类——交换机的转发规则