序列化

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 本文从源码角度分析Zookeeper的序列化机制,重点解析jute包中的InputArchive接口及其实现类BinaryInputArchive,介绍其在反序列化过程中的核心方法与数据读取逻辑,揭示Zookeeper在网络通信与数据存储中的序列化实现原理。

一、前言  在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。二、序列化  序列化主要在zookeeper.jute包中,其中涉及的主要接口如下    · InputArchive    · OutputArchive    · Index    · Record2.1 InputArchive  其是所有反序列化器都需要实现的接口,其方法如下 InputArchive的类结构如下  1. BinaryInputArchive 2. CsvInputArchive 

Java

运行代码复制代码

public class BinaryInputArchive implements InputArchive {

   // DataInput接口,用于从二进制流中读取字节

   private DataInput in;

 

   // 静态方法,用于获取Archive

   static public BinaryInputArchive getArchive(InputStream strm) {

       return new BinaryInputArchive(new DataInputStream(strm));

   }

 

   // 内部类,对应BinaryInputArchive索引

   static private class BinaryIndex implements Index {

       private int nelems;

       BinaryIndex(int nelems) {

           this.nelems = nelems;

       }

       public boolean done() {

           return (nelems <= 0);

       }

       public void incr() {

           nelems--;

       }

   }

   /** Creates a new instance of BinaryInputArchive */

   // 构造函数

   public BinaryInputArchive(DataInput in) {

       this.in = in;

   }

 

   // 读取字节

   public byte readByte(String tag) throws IOException {

       return in.readByte();

   }

 

   // 读取boolean类型

   public boolean readBool(String tag) throws IOException {

       return in.readBoolean();

   }

 

   // 读取int类型

   public int readInt(String tag) throws IOException {

       return in.readInt();

   }

 

   // 读取long类型

   public long readLong(String tag) throws IOException {

       return in.readLong();

   }

 

   // 读取float类型

   public float readFloat(String tag) throws IOException {

       return in.readFloat();

   }

 

   // 读取double类型

   public double readDouble(String tag) throws IOException {

       return in.readDouble();

   }

 

   // 读取String类型

   public String readString(String tag) throws IOException {

       // 确定长度

       int len = in.readInt();

       if (len == -1) return null;

       byte b[] = new byte[len];

       // 从输入流中读取一些字节,并将它们存储在缓冲区数组b中

       in.readFully(b);

       return new String(b, "UTF8");

   }

 

   // 最大缓冲值

   static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);


   // 读取缓冲

   public byte[] readBuffer(String tag) throws IOException {

       // 确定长度

       int len = readInt(tag);

       if (len == -1) return null;

       // Since this is a rough sanity check, add some padding to maxBuffer to

       // make up for extra fields, etc. (otherwise e.g. clients may be able to

       // write buffers larger than we can read from disk!)

       if (len < 0 || len > maxBuffer + 1024) { // 检查长度是否合理

           throw new IOException("Unreasonable length = " + len);

       }

       byte[] arr = new byte[len];

       // 从输入流中读取一些字节,并将它们存储在缓冲区数组arr中

       in.readFully(arr);

       return arr;

   }

 

   // 读取记录

   public void readRecord(Record r, String tag) throws IOException {

       // 反序列化,动态调用

       r.deserialize(this, tag);

   }

 

   // 开始读取记录,实现为空

   public void startRecord(String tag) throws IOException {}

 

   // 结束读取记录,实现为空

   public void endRecord(String tag) throws IOException {}

 

   // 开始读取向量

   public Index startVector(String tag) throws IOException {

       // 确定长度

       int len = readInt(tag);

       if (len == -1) {

           return null;

       }

       // 返回索引

       return new BinaryIndex(len);

   }

 

   // 结束读取向量

   public void endVector(String tag) throws IOException {}

 

   // 开始读取Map

   public Index startMap(String tag) throws IOException {

       // 返回索引

       return new BinaryIndex(readInt(tag));

   }

 

   // 结束读取Map,实现为空

   public void endMap(String tag) throws IOException {}

 

}



// 读取并转化为字符

char c = (char) stream.read();

if (tag == null || "".equals(tag)) {

if (c != '\n' && c != '\r') { // 进行判断

throw new IOException("Error deserializing record.");

} else {

return;

}

}

if (c != '}') { // 进行判断

throw new IOException("Error deserializing "+tag);

}

// 读取并转化为字符

c = (char) stream.read();

if (c != ',') {

// 推回缓冲区

stream.unread(c);

}

return;

}

// 开始读取vector

public Index startVector(String tag) throws IOException {

char c1 = (char) stream.read();

char c2 = (char) stream.read();

if (c1 != 'v' || c2 != '{') {

throw new IOException("Error deserializing "+tag);

}

return new CsvIndex();

}

// 结束读取vector

public void endVector(String tag) throws IOException {

char c = (char) stream.read();

if (c != '}') {

throw new IOException("Error deserializing "+tag);

}

c = (char) stream.read();

if (c != ',') {

stream.unread(c);

}

return;

}

// 开始读取Map

public Index startMap(String tag) throws IOException {

char c1 = (char) stream.read();

char c2 = (char) stream.read();

if (c1 != 'm' || c2 != '{') {

throw new IOException("Error deserializing "+tag);

}

return new CsvIndex();

}

// 结束读取Map

public void endMap(String tag) throws IOException {

char c = (char) stream.read();

if (c != '}') {

throw new IOException("Error deserializing "+tag);

}

c = (char) stream.read();

if (c != ',') {

stream.unread(c);

}

return;

}

}

相关文章
|
4月前
|
监控 Java 测试技术
OOM排查之路:一次曲折的线上故障复盘
本文记录了一次Paimon数据湖与RocksDB集成服务线上频繁OOM的排查历程。通过分析线程暴增、堆外内存泄漏,最终定位到SDK中RocksDB的JNI内存未释放问题,并借助Flink重构写入链路彻底解决。分享了MAT、NMT、async-profiler等工具的实战经验与排查思路,为类似技术栈提供借鉴。
OOM排查之路:一次曲折的线上故障复盘
|
4月前
|
人工智能 JSON 数据挖掘
大模型应用开发中MCP与Function Call的关系与区别
MCP与Function Call是大模型应用中两大关键技术。前者为跨模型标准化通信协议,实现工具与模型解耦;后者是模型调用外部功能的内置机制。二者互补协作,推动AI应用向更开放、灵活、可扩展的方向发展。
|
4月前
|
Java 网络安全 开发工具
[MES]不合格订单接入提醒功能(☆☆☆) 1.代码运行
本文介绍入职后如何快速搭建开发环境并运行项目,包括克隆代码、配置JDK/Maven/Git等工具的求助策略,并模拟真实需求:实现不合格工单超30分钟自动通知(短信/钉钉),涉及Git、Maven、SpringBoot及定时任务技术,提升新人实战能力。
|
4月前
|
存储 安全 网络协议
第三章 web阶段
HTTP协议是超文本传输协议,基于TCP实现,规定客户端与服务器间数据通信规则。常见请求方式GET与POST在参数传递、安全性和应用场景上有所不同。RESTful风格通过URL定位资源,请求方式定义操作类型。常见状态码如200(成功)、404(未找到)、500(服务器错误)等。转发在服务端完成,一次请求;重定向由客户端发起,两次请求。Cookie通过Set-Cookie和Cookie头实现会话跟踪,存储于客户端;Session依赖Cookie传递ID,数据存于服务端,更安全但存在集群共享问题。
|
4月前
|
canal 关系型数据库 MySQL
微服务原理篇(Canal-Redis)
本课程讲解多数据源同步方案,重点介绍Canal+MQ实现MySQL到Elasticsearch的数据同步机制,涵盖Canal伪装MySQL slave原理、binlog解析、消息顺序性保障,并深入Redis持久化、集群模式、缓存一致性及分布式锁等核心知识点。
 微服务原理篇(Canal-Redis)
|
4月前
|
缓存 关系型数据库 MySQL
微服务原理篇(XXLJOB-幂等-MySQL)
本课程介绍XXL-JOB分布式任务调度平台,涵盖其优势、组成结构及搭建方法,学习如何实现定时任务、避免重复执行,并掌握热点缓存更新、幂等处理、数据库索引优化与SQL调优等实战技能。
|
4月前
|
消息中间件 Java UED
异步消息组件MQ基础
本课程介绍MQ的应用场景及RabbitMQ入门,涵盖同步与异步调用区别、消息队列的解耦与流量削峰作用,学习RabbitMQ收发消息、交换机类型、队列特性及在商城项目中的应用。
异步消息组件MQ基础
|
4月前
|
关系型数据库 MySQL Java
开发环境搭建
工欲善其事,必先利其器。学习前请确保电脑内存16G以上(建议32G),安装VMware及CentOS7虚拟机,配置网络与IP,远程连接使用FinalShell。苹果用户需安装Docker并部署MySQL8。下载课程资料、Maven仓库及虚拟机镜像,导入后设置IDEA开发环境,配置JDK11、自动导包与编码。通过Git Fork项目至个人仓库并克隆到本地,完成环境搭建。
|
4月前
|
安全 算法 Java
第一章 Java基础
本文系统讲解Java核心知识,涵盖基础语法、面向对象、集合类、异常处理、IO流、多线程、JVM原理、反射泛型及Tomcat优化等内容,结合代码示例与底层机制分析,助力深入理解Java编程与性能调优。
 第一章 Java基础
|
4月前
|
自然语言处理 搜索推荐 Java
ES分布式搜索引擎入门
本课程介绍Elasticsearch的核心概念与应用,涵盖倒排索引原理、IK分词器使用及Java Client操作,实现高效全文检索、增删改查、批量导入、查询优化等功能,提升搜索性能与体验。
ES分布式搜索引擎入门