【消息队列开发】 把消息写入文件

简介: 【消息队列开发】 把消息写入文件

🍃前言

本次开发任务

  • 实现消息序列化与反序列化
  • 把消息写入文件

🎋实现消息序列化与反序列化

对于序列化与反序列化的模块

在最开始设计时,我就已经将它归入公共模块,所以我们common文件路径下创建BinaryTool类来进行实现

消息序列化:就是把一个对象(结构化的数据)转成一个字符串或者字符数组…

可能很多人都会有一个疑问,为什么要进行序列化呢?

  1. 序列化后,方便存储和运输
  2. 我们需要把消息通过文件的方式进行存储,但是呢,文件只能存储字符串或二进制数据,不能直接存对象

所以我们要进行序列化

这里由于Message 里面存储的是二进制数据,不太方便 JSON 进行序列化

所以这里,直接使用二进制的序列化方式,针对 Message 对象进行序列化

针对二进制序列化,也有很多解决方案

  1. java标准库提供了序列化方案 ObjectInputStream 和 ObjectOutputStream
  2. Hessian也是一种解决方案
  3. protobuffer

这里咱们使用java标准库提供的序列化方案 ObjectInputStream 和 ObjectOutputStream

具体实现如下:

// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.
// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
public class BinaryTool {
    // 把一个对象序列化成一个字节数组
    public static byte[] toBytes(Object object) throws IOException {
        // 这个流对象相当于一个变长的字节数组.
        // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
                // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
                // ObjectOutputStream 中.
                // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream,
                // 最终结果就写入到 ByteArrayOutputStream 里了
                objectOutputStream.writeObject(object);
            }
            // 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }
    // 把一个字节数组, 反序列化成一个对象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
                // 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
                object = objectInputStream.readObject();
            }
        }
        return object;
    }
}

需要注意的是

  • 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.

🌴把消息写入文件

该项操作我们分为五步来进行实现

🚩检查参数是否合法

我们检查一下当前要写入的队列对应的文件是否存在.

如果不合法,我们抛出异常,但是这时候抛什么异常合适呢?

这里我们干脆自定义个异常,作为公共类,用于mq 的业务逻辑中, 出现的异常, 就抛出这个异常对象, 同时在构造方法中指定出现异常的原因信息

自定义异常代码实现如下:

/*
 * 自定义一个异常类. 如果是我们的 mq 的业务逻辑中, 出现的异常, 就抛出这个异常对象,
 * 同时在构造方法中指定出现异常的原因信息
 */
public class MqException extends Exception {
    public MqException(String reason) {
        super(reason);
    }
}

第一步实现代码如下:

🚩对Massage对象序列化

调用公共类的方法即可

🚩计算出该 Message 对象的 offsetBeg 和 offsetEnd

  1. 首先我们获取存储Message对象的相应文件
  2. 计算出存储Message对象文件的大小
  3. 此时 offsetBeg = Message对象文件的大小 + 4
  4. offsetEnd = Message对象文件的大小 + 4 + message 自身长度

实现如下:

🚩写入消息到数据文件

注意这里我们在进行写入操作的时候,需要先写入4字节的头部长度来表示Message的长度

那么我们怎么让一个int类型的数字占四个字节呢?

这里我们可以一个一个字节进行填充。

但这样太麻烦了,我们可以使用Java 标准库已经给我们提供了现成的类 为:DataOutputStream / DataInputStream

实现如下:

🚩更新消息统计文件

最后不要忘了更新我们的消息统计文件

读取消息统计文件,总消息数与有效消息数加一即可

🚩特别注意

此时我们操作还存在一个很大问题

那就是多线程问题,由于这项操作并不是原子性的

所以我们要对queue对象进行操作,这里IDEA会报一个警告,我们这里可以不用管,

它警告的意思是:由于我们是对传入变量进行加锁的,它提示我们这项加锁可能无效

🚩完整代码实现

// 这个方法用来把一个新的消息, 放到队列对应的文件中.
// queue 表示要把消息写入的队列. message 则是要写的消息.
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
    // 1. 检查一下当前要写入的队列对应的文件是否存在.
    if (!checkFilesExits(queue.getName())) {
        throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());
    }
    // 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.
    byte[] messageBinary = BinaryTool.toBytes(message);
    synchronized (queue) {
        // 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd
        // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
        // offsetEnd 就是当前文件长度 + 4 + message 自身长度.
        File queueDataFile = new File(getQueueDataPath(queue.getName()));
        // 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
        message.setOffsetBeg(queueDataFile.length() + 4);
        message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
        // 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.
        try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
            try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                // 接下来要先写当前消息的长度, 占据 4 个字节的~~
                dataOutputStream.writeInt(messageBinary.length);
                // 写入消息本体
                dataOutputStream.write(messageBinary);
            }
        }
        // 5. 更新消息统计文件
        Stat stat = readStat(queue.getName());
        stat.totalCount += 1;
        stat.validCount += 1;
        writeStat(queue.getName(), stat);
    }
}

⭕总结

关于《【消息队列开发】 把消息写入文件》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
7月前
|
消息中间件 Java 数据库
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
|
7月前
|
消息中间件 存储 测试技术
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
【消息队列开发】 实现MemoryDataCenterTests类——测试管理内存数据
|
7月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
7月前
|
消息中间件 安全
【消息队列开发】 虚拟主机设计——操作绑定
【消息队列开发】 虚拟主机设计——操作绑定
|
7月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
7月前
|
消息中间件 Java
【消息队列开发】 实现消费者订阅消息
【消息队列开发】 实现消费者订阅消息
|
7月前
|
消息中间件 存储 Java
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
【消息队列开发】 实现DiskDataCenter类——管理所有硬盘上的数据
|
7月前
|
消息中间件
【消息队列开发】 实现Router类——交换机的转发规则
【消息队列开发】 实现Router类——交换机的转发规则
|
7月前
|
消息中间件
【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中
【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中
|
7月前
|
消息中间件 安全
【消息队列开发】 虚拟主机设计——操作队列
【消息队列开发】 虚拟主机设计——操作队列