一、前言 在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。二、序列化 序列化主要在zookeeper.jute包中,其中涉及的主要接口如下 · InputArchive · OutputArchive · Index · Record2.1 InputArchive 其是所有反序列化器都需要实现的接口,其方法如下 InputArchive的类结构如下 1. BinaryInputArchive 2. CsvInputArchive
Java
运行代码复制代码
public class BinaryInputArchive implements InputArchive {
// DataInput接口,用于从二进制流中读取字节
private DataInput in;
// 静态方法,用于获取Archive
static public BinaryInputArchive getArchive(InputStream strm) {
return new BinaryInputArchive(new DataInputStream(strm));
}
// 内部类,对应BinaryInputArchive索引
static private class BinaryIndex implements Index {
private int nelems;
BinaryIndex(int nelems) {
this.nelems = nelems;
}
public boolean done() {
return (nelems <= 0);
}
public void incr() {
nelems--;
}
}
/** Creates a new instance of BinaryInputArchive */
// 构造函数
public BinaryInputArchive(DataInput in) {
this.in = in;
}
// 读取字节
public byte readByte(String tag) throws IOException {
return in.readByte();
}
// 读取boolean类型
public boolean readBool(String tag) throws IOException {
return in.readBoolean();
}
// 读取int类型
public int readInt(String tag) throws IOException {
return in.readInt();
}
// 读取long类型
public long readLong(String tag) throws IOException {
return in.readLong();
}
// 读取float类型
public float readFloat(String tag) throws IOException {
return in.readFloat();
}
// 读取double类型
public double readDouble(String tag) throws IOException {
return in.readDouble();
}
// 读取String类型
public String readString(String tag) throws IOException {
// 确定长度
int len = in.readInt();
if (len == -1) return null;
byte b[] = new byte[len];
// 从输入流中读取一些字节,并将它们存储在缓冲区数组b中
in.readFully(b);
return new String(b, "UTF8");
}
// 最大缓冲值
static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);
// 读取缓冲
public byte[] readBuffer(String tag) throws IOException {
// 确定长度
int len = readInt(tag);
if (len == -1) return null;
// Since this is a rough sanity check, add some padding to maxBuffer to
// make up for extra fields, etc. (otherwise e.g. clients may be able to
// write buffers larger than we can read from disk!)
if (len < 0 || len > maxBuffer + 1024) { // 检查长度是否合理
throw new IOException("Unreasonable length = " + len);
}
byte[] arr = new byte[len];
// 从输入流中读取一些字节,并将它们存储在缓冲区数组arr中
in.readFully(arr);
return arr;
}
// 读取记录
public void readRecord(Record r, String tag) throws IOException {
// 反序列化,动态调用
r.deserialize(this, tag);
}
// 开始读取记录,实现为空
public void startRecord(String tag) throws IOException {}
// 结束读取记录,实现为空
public void endRecord(String tag) throws IOException {}
// 开始读取向量
public Index startVector(String tag) throws IOException {
// 确定长度
int len = readInt(tag);
if (len == -1) {
return null;
}
// 返回索引
return new BinaryIndex(len);
}
// 结束读取向量
public void endVector(String tag) throws IOException {}
// 开始读取Map
public Index startMap(String tag) throws IOException {
// 返回索引
return new BinaryIndex(readInt(tag));
}
// 结束读取Map,实现为空
public void endMap(String tag) throws IOException {}
}
// 读取并转化为字符
char c = (char) stream.read();
if (tag == null || "".equals(tag)) {
if (c != '\n' && c != '\r') { // 进行判断
throw new IOException("Error deserializing record.");
} else {
return;
}
}
if (c != '}') { // 进行判断
throw new IOException("Error deserializing "+tag);
}
// 读取并转化为字符
c = (char) stream.read();
if (c != ',') {
// 推回缓冲区
stream.unread(c);
}
return;
}
// 开始读取vector
public Index startVector(String tag) throws IOException {
char c1 = (char) stream.read();
char c2 = (char) stream.read();
if (c1 != 'v' || c2 != '{') {
throw new IOException("Error deserializing "+tag);
}
return new CsvIndex();
}
// 结束读取vector
public void endVector(String tag) throws IOException {
char c = (char) stream.read();
if (c != '}') {
throw new IOException("Error deserializing "+tag);
}
c = (char) stream.read();
if (c != ',') {
stream.unread(c);
}
return;
}
// 开始读取Map
public Index startMap(String tag) throws IOException {
char c1 = (char) stream.read();
char c2 = (char) stream.read();
if (c1 != 'm' || c2 != '{') {
throw new IOException("Error deserializing "+tag);
}
return new CsvIndex();
}
// 结束读取Map
public void endMap(String tag) throws IOException {
char c = (char) stream.read();
if (c != '}') {
throw new IOException("Error deserializing "+tag);
}
c = (char) stream.read();
if (c != ',') {
stream.unread(c);
}
return;
}
}