44-微服务技术栈(高级):分布式协调服务zookeeper源码篇(持久化FileTxnLog)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

一、前言

  前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

二、持久化总体框架

  持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

  


· TxnLog,接口类型,读取事务性日志的接口。

  · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。

  · Snapshot,接口类型,持久层快照接口。

  · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。

  · FileTxnSnapLog,封装了TxnLog和SnapShot。

  · Util,工具类,提供持久化所需的API。

  下面先来分析TxnLog和FileTxnLog的源码。

三、TxnLog源码分析

  TxnLog是接口,规定了对日志的响应操作。

public interface TxnLog {

   

   /**

    * roll the current

    * log being appended to

    * @throws IOException

    */

   // 回滚日志

   void rollLog() throws IOException;

   /**

    * Append a request to the transaction log

    * @param hdr the transaction header

    * @param r the transaction itself

    * returns true iff something appended, otw false

    * @throws IOException

    */

   // 添加一个请求至事务性日志

   boolean append(TxnHeader hdr, Record r) throws IOException;


   /**

    * Start reading the transaction logs

    * from a given zxid

    * @param zxid

    * @return returns an iterator to read the

    * next transaction in the logs.

    * @throws IOException

    */

   // 读取事务性日志

   TxnIterator read(long zxid) throws IOException;

   

   /**

    * the last zxid of the logged transactions.

    * @return the last zxid of the logged transactions.

    * @throws IOException

    */

   // 事务性操作的最新zxid

   long getLastLoggedZxid() throws IOException;

   

   /**

    * truncate the log to get in sync with the

    * leader.

    * @param zxid the zxid to truncate at.

    * @throws IOException

    */

   // 清空日志,与Leader保持同步

   boolean truncate(long zxid) throws IOException;

   

   /**

    * the dbid for this transaction log.

    * @return the dbid for this transaction log.

    * @throws IOException

    */

   // 获取数据库的id

   long getDbId() throws IOException;

   

   /**

    * commmit the trasaction and make sure

    * they are persisted

    * @throws IOException

    */

   // 提交事务并进行确认

   void commit() throws IOException;

 

   /**

    * close the transactions logs

    */

   // 关闭事务性日志

   void close() throws IOException;

   /**

    * an iterating interface for reading

    * transaction logs.

    */

   // 读取事务日志的迭代器接口

   public interface TxnIterator {

       /**

        * return the transaction header.

        * @return return the transaction header.

        */

       // 获取事务头部

       TxnHeader getHeader();

       

       /**

        * return the transaction record.

        * @return return the transaction record.

        */

       // 获取事务

       Record getTxn();

   

       /**

        * go to the next transaction record.

        * @throws IOException

        */

       // 下个事务

       boolean next() throws IOException;

       

       /**

        * close files and release the

        * resources

        * @throws IOException

        */

       // 关闭文件释放资源

       void close() throws IOException;

   }

}

其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

四、FileTxnLog源码分析

  对于LogFile而言,其格式可分为如下三部分

  LogFile:

    FileHeader TxnList ZeroPad

  FileHeader格式如下  

  FileHeader: {

    magic 4bytes (ZKLG)

    version 4bytes

    dbid 8bytes

  }

  TxnList格式如下

  TxnList:

    Txn || Txn TxnList

  Txn格式如下

  Txn:

    checksum Txnlen TxnHeader Record 0x42

  Txnlen格式如下

  Txnlen:

    len 4bytes

  TxnHeader格式如下

  TxnHeader: {

    sessionid 8bytes

    cxid 4bytes

     zxid 8bytes

    time 8bytes

    type 4bytes

  }

  ZeroPad格式如下

  ZeroPad:

    0 padded to EOF (filled during preallocation stage)

  了解LogFile的格式对于理解源码会有很大的帮助。

4.1 属性 

public class FileTxnLog implements TxnLog {

   private static final Logger LOG;

   

   // 预分配大小 64M

   static long preAllocSize =  65536 * 1024;

   

   // 魔术数字,默认为1514884167

   public final static int TXNLOG_MAGIC =

       ByteBuffer.wrap("ZKLG".getBytes()).getInt();


   // 版本号

   public final static int VERSION = 2;


   /** Maximum time we allow for elapsed fsync before WARNing */

   // 进行同步时,发出warn之前所能等待的最长时间

   private final static long fsyncWarningThresholdMS;


   // 静态属性,确定Logger、预分配空间大小和最长时间

   static {

       LOG = LoggerFactory.getLogger(FileTxnLog.class);


       String size = System.getProperty("zookeeper.preAllocSize");

       if (size != null) {

           try {

               preAllocSize = Long.parseLong(size) * 1024;

           } catch (NumberFormatException e) {

               LOG.warn(size + " is not a valid value for preAllocSize");

           }

       }

       fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);

   }

   

   // 最大(新)的zxid

   long lastZxidSeen;

   // 存储数据相关的流

   volatile BufferedOutputStream logStream = null;

   volatile OutputArchive oa;

   volatile FileOutputStream fos = null;


   // log目录文件

   File logDir;

   

   // 是否强制同步

   private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;

   

   // 数据库id

   long dbId;

   

   // 流列表

   private LinkedList streamsToFlush = new LinkedList();

   

   // 当前大小

   long currentSize;

   // 写日志文件

   File logFileWrite = null;

}

4.2. 核心函数 

1. append函数

public synchronized boolean append(TxnHeader hdr, Record txn)

       throws IOException

   {

       if (hdr != null) { // 事务头部不为空

           if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid

               LOG.warn("Current zxid " + hdr.getZxid()

                       + " is <= " + lastZxidSeen + " for "

                       + hdr.getType());

           }

           if (logStream==null) { // 日志流为空

              if(LOG.isInfoEnabled()){

                   LOG.info("Creating new log file: log." +  

                           Long.toHexString(hdr.getZxid()));

              }

             

              //

              logFileWrite = new File(logDir, ("log." +

                      Long.toHexString(hdr.getZxid())));

              fos = new FileOutputStream(logFileWrite);

              logStream=new BufferedOutputStream(fos);

              oa = BinaryOutputArchive.getArchive(logStream);

              //

              FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);

              // 序列化

              fhdr.serialize(oa, "fileheader");

              // Make sure that the magic number is written before padding.

              // 刷新到磁盘

              logStream.flush();

             

              // 当前通道的大小

              currentSize = fos.getChannel().position();

              // 添加fos

              streamsToFlush.add(fos);

           }

           

           // 填充文件

           padFile(fos);

           

           // Serializes transaction header and transaction data into a byte buffer.

           // 将事务头和事务数据序列化成Byte Buffer

           byte[] buf = Util.marshallTxnEntry(hdr, txn);

           if (buf == null || buf.length == 0) { // 为空,抛出异常

               throw new IOException("Faulty serialization for header " +

                       "and txn");

           }

           // 生成一个验证算法

           Checksum crc = makeChecksumAlgorithm();

           // Updates the current checksum with the specified array of bytes

           // 使用Byte数组来更新当前的Checksum

           crc.update(buf, 0, buf.length);

           // 写long类型数据

           oa.writeLong(crc.getValue(), "txnEntryCRC");

           // Write the serialized transaction record to the output archive.

           // 将序列化的事务记录写入OutputArchive

           Util.writeTxnBytes(oa, buf);

           

           return true;

       }

       return false;

   }

说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下

  ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false

  ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤

  ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④

  ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤

  ⑤ 填充数据,填充0,进入⑥

  ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦

  ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧

  ⑧ 将更新的ByteBuffer写入磁盘文件,返回true

append间接调用了padLog函数,其源码如下 

public static long padLogFile(FileOutputStream f,long currentSize,

                             long preAllocSize) throws IOException{

   // 获取位置

   long position = f.getChannel().position();

   if (position + 4096 >= currentSize) { // 计算后是否大于当前大小

       // 重新设置当前大小,剩余部分填充0

       currentSize = currentSize + preAllocSize;

       fill.position(0);

       f.getChannel().write(fill, currentSize-fill.remaining());

   }

   return currentSize;

}

说明:padLog其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小

2. getLogFiles函数 

public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {

   // 按照zxid对文件进行排序

   List files = Util.sortDataDir(logDirList, "log", true);

   long logZxid = 0;

   // Find the log file that starts before or at the same time as the

   // zxid of the snapshot

   for (File f : files) { // 遍历文件

       // 从文件中获取zxid

       long fzxid = Util.getZxidFromName(f.getName(), "log");

       if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件

           continue;

       }

       // the files

       // are sorted with zxid's

       if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)

           logZxid = fzxid;

       }

   }

   // 文件列表

   List v=new ArrayList(5);

   for (File f : files) { // 再次遍历文件

       // 从文件中获取zxid

       long fzxid = Util.getZxidFromName(f.getName(), "log");

       if (fzxid < logZxid) { // 跳过小于logZxid的文件

           continue;

       }

       // 添加

       v.add(f);

   }

   // 转化成File[] 类型后返回

   return v.toArray(new File[0]);


}

说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。

  ① 对所有log文件按照zxid进行升序排序,进入②

  ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③

   ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④

  ④ 转化后返回

getLogFiles函数调用了sortDataDir,其源码如下

public static List sortDataDir(File[] files, String prefix, boolean ascending)

{

   if(files==null)

       return new ArrayList(0);

   // 转化为列表

   List filelist = Arrays.asList(files);

   // 进行排序,Comparator是关键,根据zxid进行排序

   Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));

   return filelist;

}

说明:getLogFiles其用于排序log文件,可以选择根据zxid进行升序或降序

getLogFiles函数间接调用了getZxidFromName,其源码如下: 

// 从文件名中解析出zxid

public static long getZxidFromName(String name, String prefix) {

   long zxid = -1;

   // 对文件名进行分割

   String nameParts[] = name.split("\\.");

   if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同

       try {

           // 转化成长整形

           zxid = Long.parseLong(nameParts[1], 16);

       } catch (NumberFormatException e) {

       }

   }

   return zxid;

}

说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始

3. getLastLoggedZxid函数 

public long getLastLoggedZxid() {

   // 获取已排好序的所有的log文件

   File[] files = getLogFiles(logDir.listFiles(), 0);

   // 获取最大的zxid(最后一个log文件对应的zxid)

   long maxLog=files.length>0?

       Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;


   // if a log file is more recent we must scan it to find

   // the highest zxid

   //

   long zxid = maxLog;

   // 迭代器

   TxnIterator itr = null;

   try {

       // 新生FileTxnLog

       FileTxnLog txn = new FileTxnLog(logDir);

       // 开始读取从给定zxid之后的所有事务

       itr = txn.read(maxLog);

       while (true) { // 遍历

           if(!itr.next()) // 是否存在下一项

               break;

           // 获取事务头

           TxnHeader hdr = itr.getHeader();

           // 获取zxid

           zxid = hdr.getZxid();

       }

   } catch (IOException e) {

       LOG.warn("Unexpected exception", e);

   } finally {

       // 关闭迭代器

       close(itr);

   }

   return zxid;

}

说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下

  ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②

  ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③

  ③ 遍历所有事务并提取出相应的zxid,最后返回。

其中getLastLoggedZxid调用了read函数,其源码如下 

public TxnIterator read(long zxid) throws IOException {

   // 返回事务文件访问迭代器

   return new FileTxnIterator(logDir, zxid);

}

说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下

void init() throws IOException {

   // 新生成文件列表

   storedFiles = new ArrayList();

   // 进行排序

   List files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);

   for (File f: files) { // 遍历文件

       if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件

           storedFiles.add(f);

       }

       // add the last logfile that is less than the zxid

       else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出

           storedFiles.add(f);

           break;

       }

   }

   // go to the next logfile

   // 进入下一个log文件

   goToNextLog();

   if (!next()) // 不存在下一项,返回

       return;

   while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出

       if (!next())

           return;

   }

}

说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下  

private boolean goToNextLog() throws IOException {

   if (storedFiles.size() > 0) { // 存储的文件列表大于0

       // 取最后一个log文件

       this.logFile = storedFiles.remove(storedFiles.size()-1);

       // 针对该文件,创建InputArchive

       ia = createInputArchive(this.logFile);

       // 返回true

       return true;

   }

   return false;

}

说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下  

public boolean next() throws IOException {

   if (ia == null) { // 为空,返回false

       return false;

   }

   try {

       // 读取长整形crcValue

       long crcValue = ia.readLong("crcvalue");

       // 通过input archive读取一个事务条目

       byte[] bytes = Util.readTxnBytes(ia);

       // Since we preallocate, we define EOF to be an

       if (bytes == null || bytes.length==0) { // 对bytes进行判断

           throw new EOFException("Failed to read " + logFile);

       }

       // EOF or corrupted record

       // validate CRC

       // 验证CRC

       Checksum crc = makeChecksumAlgorithm();

       // 更新

       crc.update(bytes, 0, bytes.length);

       if (crcValue != crc.getValue()) // 验证不相等,抛出异常

           throw new IOException(CRC_ERROR);

       if (bytes == null || bytes.length == 0) // bytes为空,返回false

           return false;

       // 新生成TxnHeader

       hdr = new TxnHeader();

       // 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record

       record = SerializeUtils.deserializeTxn(bytes, hdr);

   } catch (EOFException e) { // 抛出异常

       LOG.debug("EOF excepton " + e);

       // 关闭输入流

       inputStream.close();

       // 赋值为null

       inputStream = null;

       ia = null;

       hdr = null;

       // this means that the file has ended

       // we should go to the next file

       if (!goToNextLog()) { // 没有log文件,则返回false

           return false;

       }

       // if we went to the next log file, we should call next() again

       // 继续调用next

       return next();

   } catch (IOException e) {

       inputStream.close();

       throw e;

   }

   // 返回true

   return true;

}

说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。

  ① 读取事务的crcValue值,用于后续的验证,进入②

  ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③

  ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。

  ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

4. commit函数  

public synchronized void commit() throws IOException {

   if (logStream != null) {

       // 强制刷到磁盘

       logStream.flush();

   }

   for (FileOutputStream log : streamsToFlush) { // 遍历流

       // 强制刷到磁盘

       log.flush();

       if (forceSync) { // 是否强制同步

           long startSyncNS = System.nanoTime();


           log.getChannel().force(false);

           // 计算流式的时间

           long syncElapsedMS =

               TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);

           if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告

               LOG.warn("fsync-ing the write ahead log in "

                        + Thread.currentThread().getName()

                        + " took " + syncElapsedMS

                        + "ms which will adversely effect operation latency. "

                        + "See the ZooKeeper troubleshooting guide");

           }

       }

   }

   while (streamsToFlush.size() > 1) { // 移除流并关闭

       streamsToFlush.removeFirst().close();

   }

}

说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下

  ① 若日志流logStream不为空,则强制刷新至磁盘,进入②

  ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③

  ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④

  ④ 移除所有流并关闭。

5. truncate函数 

public boolean truncate(long zxid) throws IOException {

   FileTxnIterator itr = null;

   try {

       // 获取迭代器

       itr = new FileTxnIterator(this.logDir, zxid);

       PositionInputStream input = itr.inputStream;

       long pos = input.getPosition();

       // now, truncate at the current position

       // 从当前位置开始清空

       RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");

       raf.setLength(pos);

       raf.close();

       while (itr.goToNextLog()) { // 存在下一个log文件

           if (!itr.logFile.delete()) { // 删除

               LOG.warn("Unable to truncate {}", itr.logFile);

           }

       }

   } finally {

       // 关闭迭代器

       close(itr);

   }

   return true;

}

说明:该函数用于清空大于给定zxid的所有事务日志。

五、总结

  对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,本章节需重点记住:

  • append函数实现日志追加,记录
  • 通过事务的crcValue验证,决定是否更新
  • 通过getLogFiles获取全部日志文件并排序
  • 通过getLastLoggedZxid找到最大的zxid,保证后续函数决定下一个日志文件id
  • 通过commit提交,真正生成日志文件
  • 通过trancate清空指定事务日志
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
13天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
303 2
|
13天前
|
消息中间件 算法 Java
【亿级数据专题】「分布式服务框架」 盘点本年度我们探索服务的保障容量的三大关键方案实现
【亿级数据专题】「分布式服务框架」 盘点本年度我们探索服务的保障容量的三大关键方案实现
193 0
|
7天前
|
Cloud Native 数据管理 关系型数据库
【阿里云云原生专栏】云原生数据管理:阿里云数据库服务的分布式实践
【5月更文挑战第21天】阿里云数据库服务在云原生时代展现优势,应对分布式数据管理挑战。PolarDB等服务保证高可用和弹性,通过多副本机制和分布式事务确保数据一致性和可靠性。示例代码展示了在阿里云数据库上进行分布式事务操作。此外,丰富的监控工具协助用户管理数据库性能,支持企业的数字化转型和业务增长。
179 1
|
13天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
13天前
使用JWT的服务分布式部署之后报错:JWT Check Failure:
使用JWT的服务分布式部署之后报错:JWT Check Failure:
54 1
|
13天前
|
存储 Linux 数据库
ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
【4月更文挑战第8天】ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
41 0
|
13天前
|
缓存 前端开发 小程序
【分布式技术专题】「架构设计方案」盘点和总结RBAC服务体系的功能设计及注意事项技术体系
【分布式技术专题】「架构设计方案」盘点和总结RBAC服务体系的功能设计及注意事项技术体系
38 0
|
13天前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
205 0
|
13天前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
274 0
|
13天前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
289 1

相关产品

  • 微服务引擎