Zookeeper 序列化

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 读完这篇文章你将会收获到• 在 Zookeeper 源码项目中新建模块,使用 Jute 进行序列化和反序列化• 修改 Jute 中的 buffer size 来序列化/反序列化大对象

读完这篇文章你将会收获到

  • 在 Zookeeper 源码项目中新建模块,使用 Jute 进行序列化和反序列化
  • 修改 Jute 中的 buffer size 来序列化/反序列化大对象


序言


前面的文章 我们得知、ZK 的客户端和服务端会通过网络进行一系列的数据交互(节点中的数据内容、ACL 信息),而我们知道从一个内存对象到网络传输,那么就会涉及到序列化和反序列化操作。ZK 使用到是一个叫 Jute 的序列化组件(对不起,我真的没听过,尴尬了)


Jute 介绍


Jute 是 ZK 中序列化的组件,前身是 Hadoop Record IO 中的序列化组件。

ZK 从第一个正式对外的版本开始,就一直使用 Jute 组件来进行网络数据传输和本地磁盘数据存储的序列化和反序列化工作。并不是 Jute 优秀到不被其他序列化框架所超越、而是替换这种基层组件、老版本的兼容性问题很难处理,并且 Jute 的序列化能力并不是 ZK 性能上的瓶颈,so 现在还是这个序列化组件。


Jute 使用


Talk is cheap. Show me the code

在 zk 源代码中新建一个模块


构建一个新的模块

加入依赖

dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper-jute</artifactId>
            <version>3.7.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
复制代码


创建 POJO , 并实现 Record 接口

@Data
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class Person implements Record {
    private String name;
    private int age;
    @Override
    public void serialize(OutputArchive archive, String tag) throws IOException {
        archive.startRecord(this, tag);
        archive.writeInt(age, "age");
        archive.writeString(name, "name");
        archive.endRecord(this, tag);
    }
    @Override
    public void deserialize(InputArchive archive, String tag) throws IOException {
        archive.startRecord(tag);
        age = archive.readInt("age");
        name = archive.readString("name");
        archive.endRecord(tag);
    }
}
复制代码


创建单元测试

@Test
    public void serializeTest() throws IOException {
        // 开始序列化
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(byteArrayOutputStream);
        new Person("coderLi", 100).serialize(binaryOutputArchive, "person");
        // 通常是 TCP 网络传输对象
        ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
        // 反序列化
        ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(byteBuffer);
        BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(byteBufferInputStream);
        Person person = new Person();
        person.deserialize(binaryInputArchive, "person");
        System.out.println(person.toString());
        byteArrayOutputStream.close();
        byteBufferInputStream.close();
    }
复制代码


运行

Person(name=coderLi, age=100)
复制代码


可能出现的问题

java.io.IOException: Unreasonable length = 1668244581
 at org.apache.jute.BinaryInputArchive.checkLength(BinaryInputArchive.java:166)
 at org.apache.jute.BinaryInputArchive.readString(BinaryInputArchive.java:116)
 at com.coder.li.data.Person.deserialize(Person.java:36)
复制代码


根据报错信息找到 BinaryInputArchive

public class BinaryInputArchive implements InputArchive {
    public static final String UNREASONBLE_LENGTH = "Unreasonable length = ";
    // CHECKSTYLE.OFF: ConstantName - for backward compatibility
    public static final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);
    // CHECKSTYLE.ON:
    private static final int extraMaxBuffer;
    static {
        final Integer configuredExtraMaxBuffer =
            Integer.getInteger("zookeeper.jute.maxbuffer.extrasize", maxBuffer);
        if (configuredExtraMaxBuffer < 1024) {
            // Earlier hard coded value was 1024, So the value should not be less than that value
            extraMaxBuffer = 1024;
        } else {
            extraMaxBuffer = configuredExtraMaxBuffer;
        }
    }
    ....
    ...
    private void checkLength(int len) throws IOException {
        if (len < 0 || len > maxBufferSize + extraMaxBufferSize) {
            throw new IOException(UNREASONBLE_LENGTH + len);
        }
    }
复制代码


我们系统中并没有配置这两个参数,所以这俩个 buffer size 的和应该是 1024

我们在启动参数中配置

-Djute.maxbuffer=0 -Dzookeeper.jute.maxbuffer.extrasize=1668244581
复制代码


只要 maxBufferSize + extraMaxBufferSize 和大于等于 1668244581 即可(注意不要设置过大、导致相加结果溢出变为负数)

后来排查问题发现原来导致这个问题的出现是在反序列化的时候、自己写代码出错导致,但是借此来认识到一个坑还是不错的

或者你在实体类中有一个非常大的对象需要被序列化和反序列化、也会抛出这个异常

private String name;
    private int age;
    private byte[] bigData;
    public Person() {
    }
    public Person(String name, int age) {
        this.age = age;
        this.name = name;
        bigData = new byte[2048 * 2048];
    }
复制代码


Jute 使用流程

  • 实体类实现接口 Recordserializedeserialize
  • 构造 BinaryOutputArchive
  • 序列化
  • 反序列化


相关组件


Record

public interface Record {
    void serialize(OutputArchive archive, String tag) throws IOException;
    void deserialize(InputArchive archive, String tag) throws IOException;
}
复制代码

Zookeeper Watcher 流程分析 文章中我们分析的 WatcherEvent 这个实体类就是实现了 Record 接口


InputArchive/OutputArchive


OutputArchive 类图


InputArchive 类图

这两个接口分别是 Jute 底层序列化器和反序列化器接口定义,实现类主要有 BinaryOutputArchiveBinaryInputArchive

public interface InputArchive {
byte readByte(String tag) throws IOException;
boolean readBool(String tag) throws IOException;
int readInt(String tag) throws IOException;
long readLong(String tag) throws IOException;
float readFloat(String tag) throws IOException;
double readDouble(String tag) throws IOException;
String readString(String tag) throws IOException;
byte[] readBuffer(String tag) throws IOException;
void readRecord(Record r, String tag) throws IOException;
..........
}
复制代码


都是定义了一些非常基本的方法

其实都是依赖于 Java 的 InputStreamOutputStream 进行操作的



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
存储 XML Apache
43-微服务技术栈(高级):分布式协调服务zookeeper源码篇(序列化)
在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。
116 0
|
XML Java 程序员
zookeeper源码分析--序列化篇
zookeeper源码分析--序列化篇
152 0
|
存储
【Zookeeper】源码分析之序列化
在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。
92 0
【Zookeeper】源码分析之序列化
|
存储 网络协议 Java
【分布式】Zookeeper序列化及通信协议
  前面介绍了Zookeeper的系统模型,下面进一步学习Zookeeper的底层序列化机制,Zookeeper的客户端与服务端之间会进行一系列的网络通信来实现数据传输,Zookeeper使用Jute组件来完成数据的序列化和反序列化操作。
129 0
【分布式】Zookeeper序列化及通信协议
|
12天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
30 2
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
58 0
|
3月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
81 2
|
3月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
73 0