其实很多时候我们都在使用zkclient这款jar包对zk进行相关的操作,但是在zkclient里面到底发生了什么,我们却并不是很清楚。对zk的了解出了简单的节点创建,删除,监听以外,我们还可以加深对它的思想理解。
下边我们来深入探讨一下zk的内部机制:
其实我们清楚一点,zk是采用了java语言进行编写的,因此关于zk这部分的内容对于java程序员来说是比较好接受的。
现在让我们来看下zk里面序列化部分:
在jute里面有一个叫做record的接口,专门用于定义序列化和反序列化操作:
package org.apache.jute; import java.io.IOException; /** * Interface that is implemented by generated classes. * */ public interface Record { public void serialize(OutputArchive archive, String tag) throws IOException; public void deserialize(InputArchive archive, String tag) throws IOException; } 复制代码
zk内部的jute是自己研发的一款用于处理序列化和反序列化操作的工具。核心的序列化和反序列化器有以下几种:
BinaryInputArchive 处理二进制数据
BinaryOutputArchive
CsvInputArchive 处理csv格式数据
CsvOutputArchive
XmlInputArchive处理xml格式数据
XmlOutputArchive
下边我们结合一段代码案例来深入了解jute的序列化器工作使用方式:
package 源码分析.jute包; import lombok.Data; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; import java.io.IOException; /** * @author idea * @data 2019/10/27 */ @Data public class TestBean implements Record { private int number; private String name; public TestBean(int number, String name) { this.number = number; this.name = name; } public TestBean(){} @Override public void serialize(OutputArchive archive, String tag) throws IOException { System.out.println("tag output:"+tag); archive.startRecord(this,tag); archive.writeInt(this.number,"number"); archive.writeString(this.name,"name"); archive.endRecord(this,tag); } @Override public void deserialize(InputArchive archive, String tag) throws IOException { System.out.println("tag:"+tag); archive.startRecord(tag); this.number=archive.readInt("number"); this.name=archive.readString("name"); archive.endRecord(tag); } } 接下来是我们的测试类 package 源码分析.jute包; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @author idea * @data 2019/10/27 */ public class TestDemo { public static void main(String[] args) throws IOException { //先序列化操作 ByteArrayOutputStream baos=new ByteArrayOutputStream(); BinaryOutputArchive boa= BinaryOutputArchive.getArchive(baos); new TestBean(1,"idea").serialize(boa,"tag1"); byte[] array=baos.toByteArray(); ByteArrayInputStream bins=new ByteArrayInputStream(array); BinaryInputArchive bia=BinaryInputArchive.getArchive(bins); TestBean testBean=new TestBean(); testBean.deserialize(bia,"tag1"); System.out.println("int="+testBean.getNumber()+"--string="+testBean.getName()); bins.close(); baos.close(); } } 复制代码
处理完毕之后,便是我们的数据展示环节了。
看完之后你可能会觉得,怎么zk里面的数据类型进行序列化和反序列化需要提前对bean做如此复杂的操作啊?是的,在zk源码的
包org.apache.zookeeper.data下边,会看到相应的类都做了非常复杂的操作,举个案例来说,例如说Stat这个对象,在它的源码里面可以看到这样的内容描述:
// File generated by hadoop record compiler. Do not edit. package org.apache.zookeeper.data; import org.apache.jute.*; public class Stat implements Record { private long czxid; private long mzxid; private long ctime; private long mtime; private int version; private int cversion; private int aversion; private long ephemeralOwner; private int dataLength; private int numChildren; private long pzxid; public Stat() { } public Stat( long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid) { this.czxid=czxid; this.mzxid=mzxid; this.ctime=ctime; this.mtime=mtime; this.version=version; this.cversion=cversion; this.aversion=aversion; this.ephemeralOwner=ephemeralOwner; this.dataLength=dataLength; this.numChildren=numChildren; this.pzxid=pzxid; } public long getCzxid() { return czxid; } public void setCzxid(long m_) { czxid=m_; } public long getMzxid() { return mzxid; } public void setMzxid(long m_) { mzxid=m_; } public long getCtime() { return ctime; } public void setCtime(long m_) { ctime=m_; } public long getMtime() { return mtime; } public void setMtime(long m_) { mtime=m_; } public int getVersion() { return version; } public void setVersion(int m_) { version=m_; } public int getCversion() { return cversion; } public void setCversion(int m_) { cversion=m_; } public int getAversion() { return aversion; } public void setAversion(int m_) { aversion=m_; } public long getEphemeralOwner() { return ephemeralOwner; } public void setEphemeralOwner(long m_) { ephemeralOwner=m_; } public int getDataLength() { return dataLength; } public void setDataLength(int m_) { dataLength=m_; } public int getNumChildren() { return numChildren; } public void setNumChildren(int m_) { numChildren=m_; } public long getPzxid() { return pzxid; } public void setPzxid(long m_) { pzxid=m_; } public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeLong(czxid,"czxid"); a_.writeLong(mzxid,"mzxid"); a_.writeLong(ctime,"ctime"); a_.writeLong(mtime,"mtime"); a_.writeInt(version,"version"); a_.writeInt(cversion,"cversion"); a_.writeInt(aversion,"aversion"); a_.writeLong(ephemeralOwner,"ephemeralOwner"); a_.writeInt(dataLength,"dataLength"); a_.writeInt(numChildren,"numChildren"); a_.writeLong(pzxid,"pzxid"); a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); czxid=a_.readLong("czxid"); mzxid=a_.readLong("mzxid"); ctime=a_.readLong("ctime"); mtime=a_.readLong("mtime"); version=a_.readInt("version"); cversion=a_.readInt("cversion"); aversion=a_.readInt("aversion"); ephemeralOwner=a_.readLong("ephemeralOwner"); dataLength=a_.readInt("dataLength"); numChildren=a_.readInt("numChildren"); pzxid=a_.readLong("pzxid"); a_.endRecord(tag); } public String toString() { try { java.io.ByteArrayOutputStream s = new java.io.ByteArrayOutputStream(); CsvOutputArchive a_ = new CsvOutputArchive(s); a_.startRecord(this,""); a_.writeLong(czxid,"czxid"); a_.writeLong(mzxid,"mzxid"); a_.writeLong(ctime,"ctime"); a_.writeLong(mtime,"mtime"); a_.writeInt(version,"version"); a_.writeInt(cversion,"cversion"); a_.writeInt(aversion,"aversion"); a_.writeLong(ephemeralOwner,"ephemeralOwner"); a_.writeInt(dataLength,"dataLength"); a_.writeInt(numChildren,"numChildren"); a_.writeLong(pzxid,"pzxid"); a_.endRecord(this,""); return new String(s.toByteArray(), "UTF-8"); } catch (Throwable ex) { ex.printStackTrace(); } return "ERROR"; } public void write(java.io.DataOutput out) throws java.io.IOException { BinaryOutputArchive archive = new BinaryOutputArchive(out); serialize(archive, ""); } public void readFields(java.io.DataInput in) throws java.io.IOException { BinaryInputArchive archive = new BinaryInputArchive(in); deserialize(archive, ""); } public int compareTo (Object peer_) throws ClassCastException { if (!(peer_ instanceof Stat)) { throw new ClassCastException("Comparing different types of records."); } Stat peer = (Stat) peer_; int ret = 0; ret = (czxid == peer.czxid)? 0 :((czxid<peer.czxid)?-1:1); if (ret != 0) return ret; ret = (mzxid == peer.mzxid)? 0 :((mzxid<peer.mzxid)?-1:1); if (ret != 0) return ret; ret = (ctime == peer.ctime)? 0 :((ctime<peer.ctime)?-1:1); if (ret != 0) return ret; ret = (mtime == peer.mtime)? 0 :((mtime<peer.mtime)?-1:1); if (ret != 0) return ret; ret = (version == peer.version)? 0 :((version<peer.version)?-1:1); if (ret != 0) return ret; ret = (cversion == peer.cversion)? 0 :((cversion<peer.cversion)?-1:1); if (ret != 0) return ret; ret = (aversion == peer.aversion)? 0 :((aversion<peer.aversion)?-1:1); if (ret != 0) return ret; ret = (ephemeralOwner == peer.ephemeralOwner)? 0 :((ephemeralOwner<peer.ephemeralOwner)?-1:1); if (ret != 0) return ret; ret = (dataLength == peer.dataLength)? 0 :((dataLength<peer.dataLength)?-1:1); if (ret != 0) return ret; ret = (numChildren == peer.numChildren)? 0 :((numChildren<peer.numChildren)?-1:1); if (ret != 0) return ret; ret = (pzxid == peer.pzxid)? 0 :((pzxid<peer.pzxid)?-1:1); if (ret != 0) return ret; return ret; } public boolean equals(Object peer_) { if (!(peer_ instanceof Stat)) { return false; } if (peer_ == this) { return true; } Stat peer = (Stat) peer_; boolean ret = false; ret = (czxid==peer.czxid); if (!ret) return ret; ret = (mzxid==peer.mzxid); if (!ret) return ret; ret = (ctime==peer.ctime); if (!ret) return ret; ret = (mtime==peer.mtime); if (!ret) return ret; ret = (version==peer.version); if (!ret) return ret; ret = (cversion==peer.cversion); if (!ret) return ret; ret = (aversion==peer.aversion); if (!ret) return ret; ret = (ephemeralOwner==peer.ephemeralOwner); if (!ret) return ret; ret = (dataLength==peer.dataLength); if (!ret) return ret; ret = (numChildren==peer.numChildren); if (!ret) return ret; ret = (pzxid==peer.pzxid); if (!ret) return ret; return ret; } public int hashCode() { int result = 17; int ret; ret = (int) (czxid^(czxid>>>32)); result = 37*result + ret; ret = (int) (mzxid^(mzxid>>>32)); result = 37*result + ret; ret = (int) (ctime^(ctime>>>32)); result = 37*result + ret; ret = (int) (mtime^(mtime>>>32)); result = 37*result + ret; ret = (int)version; result = 37*result + ret; ret = (int)cversion; result = 37*result + ret; ret = (int)aversion; result = 37*result + ret; ret = (int) (ephemeralOwner^(ephemeralOwner>>>32)); result = 37*result + ret; ret = (int)dataLength; result = 37*result + ret; ret = (int)numChildren; result = 37*result + ret; ret = (int) (pzxid^(pzxid>>>32)); result = 37*result + ret; return result; } public static String signature() { return "LStat(lllliiiliil)"; } } 复制代码
这段代码非常的长,通过一些晚上的性能比对测试,会发现在速度方便jute的序列化性能要比protobuf好一些,但是码流方面后者更佳。
本文讲解了关于zk内部的序列化机制,认真观看之后会发现,zk并没有对jdk本身的序列化方式做什么优化的手段,因此并没有什么过多的特别之处。更多的就是在DataInputStream那个位置加入了一些数据的接受和处理。
让我们回顾一下以前写过的一篇序列化文章,或许两篇文章对比之后你会有不同的收获和体会。