【消息队列开发】 实现消息垃圾回收

简介: 【消息队列开发】 实现消息垃圾回收

🍃前言

本次开发任务:对硬盘中存储的无用的消息数据进行垃圾回收

🎋准备工作

在书写垃圾回收的正式方法之前,我们希望有一个方法可以来判断当前内存是否需要垃圾回收

根据博主在前面开发垃圾回收的规定,我们规定总消息数达到2000,无效消息数量少于一半,此时我们再进行垃圾回收

代码实现如下:

// 检查当前是否要针对该队列的消息数据文件进行 GC
public boolean checkGC(String queueName) {
    // 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.
    Stat stat = readStat(queueName);
    if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {
        return true;
    }
    return false;
}

除此之外我们使用的是回收算法是复制算法

所以我们再创建一个新的方法,用于创建一个新的File对象,用来存放有效数据

private String getQueueDataNewPath(String queueName) {
    return getQueueDir(queueName) + "/queue_data_new.txt";
}

🎍具体实现

大致实现思路就为,准备一个行的文件名字就是 queue_data_new.txt

把之前消息数据文件中的有效消息都读出来, 写到新的文件中.

删除旧的文件, 再把新的文件改名回 queue_data.txt

最后更新统计文件即可

🚩创建一个新文件

在创建一个新文件之前,我们还需要做一件事儿

由于我们整个GC的时间可能比较长,所以我们希望在这整个方法刚开始进行记录一下当前时间,方法最后记录一下时间,利用两者差值计算整个GC的时间

接下来我们调用上述方法进行创建新文件

这里我们要进行两个判断

  • 判断一:当前创建的File对象是否存在,若存在,说明上次GC失败了,这里我们需要抛出异常
  • 判断二:判断文件是否创建成功

🚩读取有效对象

接下来我们从旧的文件读取所有有效对象,这时候我们直接调用我们前面用来写入内存的方法即可

🚩把有效消息写入新文件中

这个逻辑其实前面也有,但是我们这里再重新写一遍即可

对读取有效消息对象链表进行遍历,然后一个个写入文件中

实现如下:

🚩以旧换新

这里需要做的操作就是

删除旧的文件,然后将新文件重命名为旧文件名

这里也做两个判断:

  • 判断一:旧的文件是否删除成功
  • 判断二:是否重命名成功

🚩更新统计文件

最后不要忘记更新我们的统计文件

并在更新后,记录结束时间,并计算出该操作的具体耗时

🚩特别注意

我们载进行该操作的时候不希望受到其他操作的干扰,所以我们需要将整个操作进行加锁

加锁对象就为当前队列

🚩完整代码

// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.
// 使用复制算法来完成.
// 创建一个新的文件, 名字就是 queue_data_new.txt
// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.
// 删除旧的文件, 再把新的文件改名回 queue_data.txt
// 同时要记得更新消息统计文件.
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
    // 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
    synchronized (queue) {
        // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.
        long gcBeg = System.currentTimeMillis();
        // 1. 创建一个新的文件
        File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
        if (queueDataNewFile.exists()) {
            // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
            throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());
        }
        boolean ok = queueDataNewFile.createNewFile();
        if (!ok) {
            throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
        }
        // 2. 从旧的文件中, 读取出所有的有效消息对象了. (这个逻辑直接调用上述方法即可, 不必重新写了)
        LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
        // 3. 把有效消息, 写入到新的文件中.
        try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
            try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                for (Message message : messages) {
                    byte[] buffer = BinaryTool.toBytes(message);
                    // 先写四个字节消息的长度
                    dataOutputStream.writeInt(buffer.length);
                    dataOutputStream.write(buffer);
                }
            }
        }
        // 4. 删除旧的数据文件, 并且把新的文件进行重命名
        File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
        ok = queueDataOldFile.delete();
        if (!ok) {
            throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
        }
        // 把 queue_data_new.txt => queue_data.txt
        ok = queueDataNewFile.renameTo(queueDataOldFile);
        if (!ok) {
            throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                    + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
        }
        // 5. 更新统计文件
        Stat stat = readStat(queue.getName());
        stat.totalCount = messages.size();
        stat.validCount = messages.size();
        writeStat(queue.getName(), stat);
        long gcEnd = System.currentTimeMillis();
        System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
                + (gcEnd - gcBeg) + "ms");
    }
}

⭕总结

关于《【消息队列开发】 实现消息垃圾回收》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
7月前
|
消息中间件 Java 数据库
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
|
7月前
|
消息中间件 存储 测试技术
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
|
7月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
7月前
|
消息中间件 安全
【消息队列开发】 虚拟主机设计——操作绑定
【消息队列开发】 虚拟主机设计——操作绑定
|
7月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
6月前
|
监控 Java 运维
开发与运维收集问题之jstat命令查看JVM垃圾回收情况如何解决
开发与运维收集问题之jstat命令查看JVM垃圾回收情况如何解决
66 1
|
6月前
|
Java 运维
开发与运维技术问题之ava对象头压缩技术支持所有的Java垃圾回收器如何解决
开发与运维技术问题之ava对象头压缩技术支持所有的Java垃圾回收器如何解决
46 1
|
7月前
|
消息中间件 Java
【消息队列开发】 实现消费者订阅消息
【消息队列开发】 实现消费者订阅消息
|
7月前
|
消息中间件 存储 Java
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
|
7月前
|
消息中间件
【消息队列开发】 实现Router类——交换机的转发规则
【消息队列开发】 实现Router类——交换机的转发规则