Zookeeper持久化FileTxnLog

简介: 本文深入分析Zookeeper持久化机制的核心源码,重点解析TxnLog接口及其实现类FileTxnLog。涵盖事务日志的追加、读取、提交与截断过程,详解日志文件结构、CRC校验、预分配机制及zxid管理,揭示数据持久化的实现原理与关键流程。

一、前言
  前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。
二、持久化总体框架
  持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。
  
  
· TxnLog,接口类型,读取事务性日志的接口。
  · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
  · Snapshot,接口类型,持久层快照接口。
  · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
  · FileTxnSnapLog,封装了TxnLog和SnapShot。
  · Util,工具类,提供持久化所需的API。

  下面先来分析TxnLog和FileTxnLog的源码。
三、TxnLog源码分析
  TxnLog是接口,规定了对日志的响应操作。
public interface TxnLog {

/**
 * roll the current
 * log being appended to
 * @throws IOException 
 */
// 回滚日志
void rollLog() throws IOException;
/**
 * Append a request to the transaction log
 * @param hdr the transaction header
 * @param r the transaction itself
 * returns true iff something appended, otw false 
 * @throws IOException
 */
// 添加一个请求至事务性日志
boolean append(TxnHeader hdr, Record r) throws IOException;

/**
 * Start reading the transaction logs
 * from a given zxid
 * @param zxid
 * @return returns an iterator to read the 
 * next transaction in the logs.
 * @throws IOException
 */
// 读取事务性日志
TxnIterator read(long zxid) throws IOException;

/**
 * the last zxid of the logged transactions.
 * @return the last zxid of the logged transactions.
 * @throws IOException
 */
// 事务性操作的最新zxid
long getLastLoggedZxid() throws IOException;

/**
 * truncate the log to get in sync with the 
 * leader.
 * @param zxid the zxid to truncate at.
 * @throws IOException 
 */
// 清空日志,与Leader保持同步
boolean truncate(long zxid) throws IOException;

/**
 * the dbid for this transaction log. 
 * @return the dbid for this transaction log.
 * @throws IOException
 */
// 获取数据库的id
long getDbId() throws IOException;

/**
 * commmit the trasaction and make sure
 * they are persisted
 * @throws IOException
 */
// 提交事务并进行确认
void commit() throws IOException;

/** 
 * close the transactions logs
 */
// 关闭事务性日志
void close() throws IOException;
/**
 * an iterating interface for reading 
 * transaction logs. 
 */
// 读取事务日志的迭代器接口
public interface TxnIterator {
    /**
     * return the transaction header.
     * @return return the transaction header.
     */
    // 获取事务头部
    TxnHeader getHeader();

    /**
     * return the transaction record.
     * @return return the transaction record.
     */
    // 获取事务
    Record getTxn();

    /**
     * go to the next transaction record.
     * @throws IOException
     */
    // 下个事务
    boolean next() throws IOException;

    /**
     * close files and release the 
     * resources
     * @throws IOException
     */
    // 关闭文件释放资源
    void close() throws IOException;
}

}
其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。
四、FileTxnLog源码分析
  对于LogFile而言,其格式可分为如下三部分
  LogFile:
    FileHeader TxnList ZeroPad
  FileHeader格式如下  
  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }
  TxnList格式如下
  TxnList:
    Txn || Txn TxnList
  Txn格式如下
  Txn:
    checksum Txnlen TxnHeader Record 0x42
  Txnlen格式如下
  Txnlen:
    len 4bytes
  TxnHeader格式如下
  TxnHeader: {
    sessionid 8bytes
    cxid 4bytes
    zxid 8bytes
    time 8bytes
    type 4bytes
  }
  ZeroPad格式如下
  ZeroPad:
    0 padded to EOF (filled during preallocation stage)
  了解LogFile的格式对于理解源码会有很大的帮助。
4.1 属性 
public class FileTxnLog implements TxnLog {
private static final Logger LOG;

// 预分配大小 64M
static long preAllocSize =  65536 * 1024;

// 魔术数字,默认为1514884167
public final static int TXNLOG_MAGIC =
    ByteBuffer.wrap("ZKLG".getBytes()).getInt();

// 版本号
public final static int VERSION = 2;

/** Maximum time we allow for elapsed fsync before WARNing */
// 进行同步时,发出warn之前所能等待的最长时间
private final static long fsyncWarningThresholdMS;

// 静态属性,确定Logger、预分配空间大小和最长时间
static {
    LOG = LoggerFactory.getLogger(FileTxnLog.class);

    String size = System.getProperty("zookeeper.preAllocSize");
    if (size != null) {
        try {
            preAllocSize = Long.parseLong(size) * 1024;
        } catch (NumberFormatException e) {
            LOG.warn(size + " is not a valid value for preAllocSize");
        }
    }
    fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
}

// 最大(新)的zxid
long lastZxidSeen;
// 存储数据相关的流
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;

// log目录文件
File logDir;

// 是否强制同步
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;

// 数据库id
long dbId;

// 流列表
private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();

// 当前大小
long currentSize;
// 写日志文件
File logFileWrite = null;

}
4.2. 核心函数 

  1. append函数
    public synchronized boolean append(TxnHeader hdr, Record txn)

     throws IOException
    

    {

     if (hdr != null) { // 事务头部不为空
         if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
             LOG.warn("Current zxid " + hdr.getZxid()
                     + " is <= " + lastZxidSeen + " for "
                     + hdr.getType());
         }
         if (logStream==null) { // 日志流为空
            if(LOG.isInfoEnabled()){
                 LOG.info("Creating new log file: log." +  
                         Long.toHexString(hdr.getZxid()));
            }
    
            // 
            logFileWrite = new File(logDir, ("log." + 
                    Long.toHexString(hdr.getZxid())));
            fos = new FileOutputStream(logFileWrite);
            logStream=new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            // 
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
            // 序列化
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            // 刷新到磁盘
            logStream.flush();
    
            // 当前通道的大小
            currentSize = fos.getChannel().position();
            // 添加fos
            streamsToFlush.add(fos);
         }
    
         // 填充文件
         padFile(fos);
    
         // Serializes transaction header and transaction data into a byte buffer.
         // 将事务头和事务数据序列化成Byte Buffer
         byte[] buf = Util.marshallTxnEntry(hdr, txn);
         if (buf == null || buf.length == 0) { // 为空,抛出异常
             throw new IOException("Faulty serialization for header " +
                     "and txn");
         }
         // 生成一个验证算法
         Checksum crc = makeChecksumAlgorithm();
         // Updates the current checksum with the specified array of bytes
         // 使用Byte数组来更新当前的Checksum
         crc.update(buf, 0, buf.length);
         // 写long类型数据
         oa.writeLong(crc.getValue(), "txnEntryCRC");
         // Write the serialized transaction record to the output archive.
         // 将序列化的事务记录写入OutputArchive
         Util.writeTxnBytes(oa, buf);
    
         return true;
     }
     return false;
    

    }
    说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下
      ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false
      ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤
      ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④
      ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤
      ⑤ 填充数据,填充0,进入⑥
      ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦
      ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧
      ⑧ 将更新的ByteBuffer写入磁盘文件,返回true
    append间接调用了padLog函数,其源码如下 
    public static long padLogFile(FileOutputStream f,long currentSize,

                           long preAllocSize) throws IOException{
    

    // 获取位置
    long position = f.getChannel().position();
    if (position + 4096 >= currentSize) { // 计算后是否大于当前大小

     // 重新设置当前大小,剩余部分填充0
     currentSize = currentSize + preAllocSize;
     fill.position(0);
     f.getChannel().write(fill, currentSize-fill.remaining());
    

    }
    return currentSize;
    }
    说明:padLog其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

  2. getLogFiles函数 
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
    // 按照zxid对文件进行排序
    List files = Util.sortDataDir(logDirList, "log", true);
    long logZxid = 0;
    // Find the log file that starts before or at the same time as the
    // zxid of the snapshot
    for (File f : files) { // 遍历文件
     // 从文件中获取zxid
     long fzxid = Util.getZxidFromName(f.getName(), "log");
     if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
         continue;
     }
     // the files
     // are sorted with zxid's
     if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
         logZxid = fzxid;
     }
    
    }
    // 文件列表
    List v=new ArrayList(5);
    for (File f : files) { // 再次遍历文件
     // 从文件中获取zxid
     long fzxid = Util.getZxidFromName(f.getName(), "log");
     if (fzxid < logZxid) { // 跳过小于logZxid的文件
         continue;
     }
     // 添加
     v.add(f);
    
    }
    // 转化成File[] 类型后返回
    return v.toArray(new File[0]);

}
说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。
  ① 对所有log文件按照zxid进行升序排序,进入②
  ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③
  ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④
  ④ 转化后返回
getLogFiles函数调用了sortDataDir,其源码如下
public static List sortDataDir(File[] files, String prefix, boolean ascending)
{
if(files==null)
return new ArrayList(0);
// 转化为列表
List filelist = Arrays.asList(files);
// 进行排序,Comparator是关键,根据zxid进行排序
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
return filelist;
}
说明:getLogFiles其用于排序log文件,可以选择根据zxid进行升序或降序。
getLogFiles函数间接调用了getZxidFromName,其源码如下: 
// 从文件名中解析出zxid
public static long getZxidFromName(String name, String prefix) {
long zxid = -1;
// 对文件名进行分割
String nameParts[] = name.split("\.");
if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
try {
// 转化成长整形
zxid = Long.parseLong(nameParts[1], 16);
} catch (NumberFormatException e) {
}
}
return zxid;
}
说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

  1. getLastLoggedZxid函数 
    public long getLastLoggedZxid() {
    // 获取已排好序的所有的log文件
    File[] files = getLogFiles(logDir.listFiles(), 0);
    // 获取最大的zxid(最后一个log文件对应的zxid)
    long maxLog=files.length>0?

     Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
    

    // if a log file is more recent we must scan it to find
    // the highest zxid
    //
    long zxid = maxLog;
    // 迭代器
    TxnIterator itr = null;
    try {

     // 新生FileTxnLog
     FileTxnLog txn = new FileTxnLog(logDir);
     // 开始读取从给定zxid之后的所有事务
     itr = txn.read(maxLog);
     while (true) { // 遍历
         if(!itr.next()) // 是否存在下一项
             break;
         // 获取事务头
         TxnHeader hdr = itr.getHeader();
         // 获取zxid
         zxid = hdr.getZxid();
     }
    

    } catch (IOException e) {

     LOG.warn("Unexpected exception", e);
    

    } finally {

     // 关闭迭代器
     close(itr);
    

    }
    return zxid;
    }
    说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下
      ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②
      ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③
      ③ 遍历所有事务并提取出相应的zxid,最后返回。
    其中getLastLoggedZxid调用了read函数,其源码如下 
    public TxnIterator read(long zxid) throws IOException {
    // 返回事务文件访问迭代器
    return new FileTxnIterator(logDir, zxid);
    }
    说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下
    void init() throws IOException {
    // 新生成文件列表
    storedFiles = new ArrayList();
    // 进行排序
    List files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
    for (File f: files) { // 遍历文件

     if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件
         storedFiles.add(f);
     }
     // add the last logfile that is less than the zxid
     else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出
         storedFiles.add(f);
         break;
     }
    

    }
    // go to the next logfile
    // 进入下一个log文件
    goToNextLog();
    if (!next()) // 不存在下一项,返回

     return;
    

    while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出

     if (!next())
         return;
    

    }
    }
    说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下  
    private boolean goToNextLog() throws IOException {
    if (storedFiles.size() > 0) { // 存储的文件列表大于0

     // 取最后一个log文件
     this.logFile = storedFiles.remove(storedFiles.size()-1);
     // 针对该文件,创建InputArchive
     ia = createInputArchive(this.logFile);
     // 返回true
     return true;
    

    }
    return false;
    }
    说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下  
    public boolean next() throws IOException {
    if (ia == null) { // 为空,返回false

     return false;
    

    }
    try {

     // 读取长整形crcValue
     long crcValue = ia.readLong("crcvalue");
     // 通过input archive读取一个事务条目
     byte[] bytes = Util.readTxnBytes(ia);
     // Since we preallocate, we define EOF to be an
     if (bytes == null || bytes.length==0) { // 对bytes进行判断
         throw new EOFException("Failed to read " + logFile);
     }
     // EOF or corrupted record
     // validate CRC
     // 验证CRC
     Checksum crc = makeChecksumAlgorithm();
     // 更新
     crc.update(bytes, 0, bytes.length);
     if (crcValue != crc.getValue()) // 验证不相等,抛出异常
         throw new IOException(CRC_ERROR);
     if (bytes == null || bytes.length == 0) // bytes为空,返回false
         return false;
     // 新生成TxnHeader
     hdr = new TxnHeader();
     // 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record
     record = SerializeUtils.deserializeTxn(bytes, hdr);
    

    } catch (EOFException e) { // 抛出异常

     LOG.debug("EOF excepton " + e);
     // 关闭输入流
     inputStream.close();
     // 赋值为null
     inputStream = null;
     ia = null;
     hdr = null;
     // this means that the file has ended
     // we should go to the next file
     if (!goToNextLog()) { // 没有log文件,则返回false
         return false;
     }
     // if we went to the next log file, we should call next() again
     // 继续调用next
     return next();
    

    } catch (IOException e) {

     inputStream.close();
     throw e;
    

    }
    // 返回true
    return true;
    }
    说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。
      ① 读取事务的crcValue值,用于后续的验证,进入②
      ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③
      ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。
      ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

  2. commit函数  
    public synchronized void commit() throws IOException {
    if (logStream != null) {

     // 强制刷到磁盘
     logStream.flush();
    

    }
    for (FileOutputStream log : streamsToFlush) { // 遍历流

     // 强制刷到磁盘
     log.flush();
     if (forceSync) { // 是否强制同步
         long startSyncNS = System.nanoTime();
    
         log.getChannel().force(false);
         // 计算流式的时间
         long syncElapsedMS =
             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
         if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告
             LOG.warn("fsync-ing the write ahead log in "
                      + Thread.currentThread().getName()
                      + " took " + syncElapsedMS
                      + "ms which will adversely effect operation latency. "
                      + "See the ZooKeeper troubleshooting guide");
         }
     }
    

    }
    while (streamsToFlush.size() > 1) { // 移除流并关闭

     streamsToFlush.removeFirst().close();
    

    }
    }
    说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下
      ① 若日志流logStream不为空,则强制刷新至磁盘,进入②
      ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③
      ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④
      ④ 移除所有流并关闭。

  3. truncate函数 
    public boolean truncate(long zxid) throws IOException {
    FileTxnIterator itr = null;
    try {
     // 获取迭代器
     itr = new FileTxnIterator(this.logDir, zxid);
     PositionInputStream input = itr.inputStream;
     long pos = input.getPosition();
     // now, truncate at the current position
     // 从当前位置开始清空
     RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
     raf.setLength(pos);
     raf.close();
     while (itr.goToNextLog()) { // 存在下一个log文件
         if (!itr.logFile.delete()) { // 删除
             LOG.warn("Unable to truncate {}", itr.logFile);
         }
     }
    
    } finally {
     // 关闭迭代器
     close(itr);
    
    }
    return true;
    }
    说明:该函数用于清空大于给定zxid的所有事务日志。
    五、总结
      对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,本章节需重点记住:
    ● append函数实现日志追加,记录
    ● 通过事务的crcValue验证,决定是否更新
    ● 通过getLogFiles获取全部日志文件并排序
    ● 通过getLastLoggedZxid找到最大的zxid,保证后续函数决定下一个日志文件id
    ● 通过commit提交,真正生成日志文件
    ● 通过trancate清空指定事务日志
相关文章
|
13天前
|
数据采集 人工智能 安全
|
8天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
639 4
|
8天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
348 164
|
7天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
359 155