读完这篇文章你将会收获到
- 在 Zookeeper 源码项目中新建模块,使用 Jute 进行序列化和反序列化
- 修改 Jute 中的 buffer size 来序列化/反序列化大对象
序言
从 前面的文章 我们得知、ZK 的客户端和服务端会通过网络进行一系列的数据交互(节点中的数据内容、ACL 信息),而我们知道从一个内存对象到网络传输,那么就会涉及到序列化和反序列化操作。ZK 使用到是一个叫 Jute 的序列化组件(对不起,我真的没听过,尴尬了)
Jute 介绍
Jute 是 ZK 中序列化的组件,前身是 Hadoop Record IO 中的序列化组件。
ZK 从第一个正式对外的版本开始,就一直使用 Jute 组件来进行网络数据传输和本地磁盘数据存储的序列化和反序列化工作。并不是 Jute 优秀到不被其他序列化框架所超越、而是替换这种基层组件、老版本的兼容性问题很难处理,并且 Jute 的序列化能力并不是 ZK 性能上的瓶颈,so 现在还是这个序列化组件。
Jute 使用
Talk is cheap. Show me the code
在 zk 源代码中新建一个模块
构建一个新的模块
加入依赖
dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper-jute</artifactId> <version>3.7.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0-SNAPSHOT</version> </dependency> </dependencies> 复制代码
创建 POJO , 并实现 Record
接口
@Data @Accessors(chain = true) @AllArgsConstructor @NoArgsConstructor public class Person implements Record { private String name; private int age; @Override public void serialize(OutputArchive archive, String tag) throws IOException { archive.startRecord(this, tag); archive.writeInt(age, "age"); archive.writeString(name, "name"); archive.endRecord(this, tag); } @Override public void deserialize(InputArchive archive, String tag) throws IOException { archive.startRecord(tag); age = archive.readInt("age"); name = archive.readString("name"); archive.endRecord(tag); } } 复制代码
创建单元测试
@Test public void serializeTest() throws IOException { // 开始序列化 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(byteArrayOutputStream); new Person("coderLi", 100).serialize(binaryOutputArchive, "person"); // 通常是 TCP 网络传输对象 ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); // 反序列化 ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(byteBuffer); BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(byteBufferInputStream); Person person = new Person(); person.deserialize(binaryInputArchive, "person"); System.out.println(person.toString()); byteArrayOutputStream.close(); byteBufferInputStream.close(); } 复制代码
运行
Person(name=coderLi, age=100) 复制代码
可能出现的问题
java.io.IOException: Unreasonable length = 1668244581 at org.apache.jute.BinaryInputArchive.checkLength(BinaryInputArchive.java:166) at org.apache.jute.BinaryInputArchive.readString(BinaryInputArchive.java:116) at com.coder.li.data.Person.deserialize(Person.java:36) 复制代码
根据报错信息找到 BinaryInputArchive
public class BinaryInputArchive implements InputArchive { public static final String UNREASONBLE_LENGTH = "Unreasonable length = "; // CHECKSTYLE.OFF: ConstantName - for backward compatibility public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff); // CHECKSTYLE.ON: private static final int extraMaxBuffer; static { final Integer configuredExtraMaxBuffer = Integer.getInteger("zookeeper.jute.maxbuffer.extrasize", maxBuffer); if (configuredExtraMaxBuffer < 1024) { // Earlier hard coded value was 1024, So the value should not be less than that value extraMaxBuffer = 1024; } else { extraMaxBuffer = configuredExtraMaxBuffer; } } .... ... private void checkLength(int len) throws IOException { if (len < 0 || len > maxBufferSize + extraMaxBufferSize) { throw new IOException(UNREASONBLE_LENGTH + len); } } 复制代码
我们系统中并没有配置这两个参数,所以这俩个 buffer size 的和应该是 1024
我们在启动参数中配置
-Djute.maxbuffer=0 -Dzookeeper.jute.maxbuffer.extrasize=1668244581 复制代码
只要 maxBufferSize + extraMaxBufferSize
和大于等于 1668244581 即可(注意不要设置过大、导致相加结果溢出变为负数)
后来排查问题发现原来导致这个问题的出现是在反序列化的时候、自己写代码出错导致,但是借此来认识到一个坑还是不错的
或者你在实体类中有一个非常大的对象需要被序列化和反序列化、也会抛出这个异常
private String name; private int age; private byte[] bigData; public Person() { } public Person(String name, int age) { this.age = age; this.name = name; bigData = new byte[2048 * 2048]; } 复制代码
Jute 使用流程
- 实体类实现接口
Record
的serialize
和deserialize
- 构造
BinaryOutputArchive
- 序列化
- 反序列化
相关组件
Record
public interface Record { void serialize(OutputArchive archive, String tag) throws IOException; void deserialize(InputArchive archive, String tag) throws IOException; } 复制代码
在 Zookeeper Watcher 流程分析 文章中我们分析的 WatcherEvent
这个实体类就是实现了 Record
接口
InputArchive/OutputArchive
OutputArchive 类图
InputArchive 类图
这两个接口分别是 Jute 底层序列化器和反序列化器接口定义,实现类主要有 BinaryOutputArchive
和 BinaryInputArchive
public interface InputArchive { byte readByte(String tag) throws IOException; boolean readBool(String tag) throws IOException; int readInt(String tag) throws IOException; long readLong(String tag) throws IOException; float readFloat(String tag) throws IOException; double readDouble(String tag) throws IOException; String readString(String tag) throws IOException; byte[] readBuffer(String tag) throws IOException; void readRecord(Record r, String tag) throws IOException; .......... } 复制代码
都是定义了一些非常基本的方法
其实都是依赖于 Java 的 InputStream
和 OutputStream
进行操作的