🍃前言
本次开发任务
- 实现消息序列化与反序列化
- 把消息写入文件
🎋实现消息序列化与反序列化
对于序列化与反序列化的模块
在最开始设计时,我就已经将它归入公共模块,所以我们common文件路径下创建BinaryTool类来进行实现
消息序列化:就是把一个对象(结构化的数据)转成一个字符串或者字符数组…
可能很多人都会有一个疑问,为什么要进行序列化呢?
- 序列化后,方便存储和运输
- 我们需要把消息通过文件的方式进行存储,但是呢,文件只能存储字符串或二进制数据,不能直接存对象
所以我们要进行序列化
这里由于Message 里面存储的是二进制数据,不太方便 JSON 进行序列化
所以这里,直接使用二进制的序列化方式,针对 Message 对象进行序列化
针对二进制序列化,也有很多解决方案
- java标准库提供了序列化方案 ObjectInputStream 和 ObjectOutputStream
- Hessian也是一种解决方案
- protobuffer
- …
这里咱们使用java标准库提供的序列化方案 ObjectInputStream 和 ObjectOutputStream
具体实现如下:
// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的. // 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口. public class BinaryTool { // 把一个对象序列化成一个字节数组 public static byte[] toBytes(Object object) throws IOException { // 这个流对象相当于一个变长的字节数组. // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[] try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) { // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 // ObjectOutputStream 中. // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, // 最终结果就写入到 ByteArrayOutputStream 里了 objectOutputStream.writeObject(object); } // 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[] return byteArrayOutputStream.toByteArray(); } } // 把一个字节数组, 反序列化成一个对象 public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException { Object object = null; try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) { try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) { // 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化. object = objectInputStream.readObject(); } } return object; } }
需要注意的是
- 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
🌴把消息写入文件
该项操作我们分为五步来进行实现
🚩检查参数是否合法
我们检查一下当前要写入的队列对应的文件是否存在.
如果不合法,我们抛出异常,但是这时候抛什么异常合适呢?
这里我们干脆自定义个异常,作为公共类,用于mq 的业务逻辑中, 出现的异常, 就抛出这个异常对象, 同时在构造方法中指定出现异常的原因信息
自定义异常代码实现如下:
/* * 自定义一个异常类. 如果是我们的 mq 的业务逻辑中, 出现的异常, 就抛出这个异常对象, * 同时在构造方法中指定出现异常的原因信息 */ public class MqException extends Exception { public MqException(String reason) { super(reason); } }
第一步实现代码如下:
🚩对Massage对象序列化
调用公共类的方法即可
🚩计算出该 Message 对象的 offsetBeg 和 offsetEnd
- 首先我们获取存储Message对象的相应文件
- 计算出存储Message对象文件的大小
- 此时 offsetBeg = Message对象文件的大小 + 4
- offsetEnd = Message对象文件的大小 + 4 + message 自身长度
实现如下:
🚩写入消息到数据文件
注意这里我们在进行写入操作的时候,需要先写入4字节的头部长度来表示Message的长度
那么我们怎么让一个int类型的数字占四个字节呢?
这里我们可以一个一个字节进行填充。
但这样太麻烦了,我们可以使用Java 标准库已经给我们提供了现成的类 为:DataOutputStream / DataInputStream
实现如下:
🚩更新消息统计文件
最后不要忘了更新我们的消息统计文件
读取消息统计文件,总消息数与有效消息数加一即可
🚩特别注意
此时我们操作还存在一个很大问题
那就是多线程问题,由于这项操作并不是原子性的
所以我们要对queue对象进行操作,这里IDEA会报一个警告,我们这里可以不用管,
它警告的意思是:由于我们是对传入变量进行加锁的,它提示我们这项加锁可能无效
🚩完整代码实现
// 这个方法用来把一个新的消息, 放到队列对应的文件中. // queue 表示要把消息写入的队列. message 则是要写的消息. public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException { // 1. 检查一下当前要写入的队列对应的文件是否存在. if (!checkFilesExits(queue.getName())) { throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName()); } // 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组. byte[] messageBinary = BinaryTool.toBytes(message); synchronized (queue) { // 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4 // offsetEnd 就是当前文件长度 + 4 + message 自身长度. File queueDataFile = new File(getQueueDataPath(queue.getName())); // 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节. message.setOffsetBeg(queueDataFile.length() + 4); message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length); // 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾. try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) { try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { // 接下来要先写当前消息的长度, 占据 4 个字节的~~ dataOutputStream.writeInt(messageBinary.length); // 写入消息本体 dataOutputStream.write(messageBinary); } } // 5. 更新消息统计文件 Stat stat = readStat(queue.getName()); stat.totalCount += 1; stat.validCount += 1; writeStat(queue.getName(), stat); } }
⭕总结
关于《【消息队列开发】 把消息写入文件》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下