持久化FileTxnLog

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 本文深入分析Zookeeper持久化机制的源码实现,重点解析`FileTxnLog`和`TxnLog`等核心类。详细阐述事务日志的写入、读取、快照匹配及文件预分配等过程,揭示Zookeeper数据存储的底层原理,帮助理解其高可靠性的实现基础。(238字)

一、前言
  前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。
二、持久化总体框架
  持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。
  
  
· TxnLog,接口类型,读取事务性日志的接口。
  · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
  · Snapshot,接口类型,持久层快照接口。
  · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
  · FileTxnSnapLog,封装了TxnLog和SnapShot。
  · Util,工具类,提供持久化所需的API。

  下面先来分析TxnLog和FileTxnLog的源码。
三、TxnLog源码分析
  TxnLog是接口,规定了对日志的响应操作。
public interface TxnLog {

/**
 * roll the current
 * log being appended to
 * @throws IOException 
 */
// 回滚日志
void rollLog() throws IOException;
/**
 * Append a request to the transaction log
 * @param hdr the transaction header
 * @param r the transaction itself
 * returns true iff something appended, otw false 
 * @throws IOException
 */
// 添加一个请求至事务性日志
boolean append(TxnHeader hdr, Record r) throws IOException;

/**
 * Start reading the transaction logs
 * from a given zxid
 * @param zxid
 * @return returns an iterator to read the 
 * next transaction in the logs.
 * @throws IOException
 */
// 读取事务性日志
TxnIterator read(long zxid) throws IOException;

/**
 * the last zxid of the logged transactions.
 * @return the last zxid of the logged transactions.
 * @throws IOException
 */
// 事务性操作的最新zxid
long getLastLoggedZxid() throws IOException;

/**
 * truncate the log to get in sync with the 
 * leader.
 * @param zxid the zxid to truncate at.
 * @throws IOException 
 */
// 清空日志,与Leader保持同步
boolean truncate(long zxid) throws IOException;

/**
 * the dbid for this transaction log. 
 * @return the dbid for this transaction log.
 * @throws IOException
 */
// 获取数据库的id
long getDbId() throws IOException;

/**
 * commmit the trasaction and make sure
 * they are persisted
 * @throws IOException
 */
// 提交事务并进行确认
void commit() throws IOException;

/** 
 * close the transactions logs
 */
// 关闭事务性日志
void close() throws IOException;
/**
 * an iterating interface for reading 
 * transaction logs. 
 */
// 读取事务日志的迭代器接口
public interface TxnIterator {
    /**
     * return the transaction header.
     * @return return the transaction header.
     */
    // 获取事务头部
    TxnHeader getHeader();

    /**
     * return the transaction record.
     * @return return the transaction record.
     */
    // 获取事务
    Record getTxn();

    /**
     * go to the next transaction record.
     * @throws IOException
     */
    // 下个事务
    boolean next() throws IOException;

    /**
     * close files and release the 
     * resources
     * @throws IOException
     */
    // 关闭文件释放资源
    void close() throws IOException;
}

}
其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。
四、FileTxnLog源码分析
  对于LogFile而言,其格式可分为如下三部分
  LogFile:
    FileHeader TxnList ZeroPad
  FileHeader格式如下  
  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }
  TxnList格式如下
  TxnList:
    Txn || Txn TxnList
  Txn格式如下
  Txn:
    checksum Txnlen TxnHeader Record 0x42
  Txnlen格式如下
  Txnlen:
    len 4bytes
  TxnHeader格式如下
  TxnHeader: {
    sessionid 8bytes
    cxid 4bytes
    zxid 8bytes
    time 8bytes
    type 4bytes
  }
  ZeroPad格式如下
  ZeroPad:
    0 padded to EOF (filled during preallocation stage)
  了解LogFile的格式对于理解源码会有很大的帮助。
4.1 属性 
public class FileTxnLog implements TxnLog {
private static final Logger LOG;

// 预分配大小 64M
static long preAllocSize =  65536 * 1024;

// 魔术数字,默认为1514884167
public final static int TXNLOG_MAGIC =
    ByteBuffer.wrap("ZKLG".getBytes()).getInt();

// 版本号
public final static int VERSION = 2;

/** Maximum time we allow for elapsed fsync before WARNing */
// 进行同步时,发出warn之前所能等待的最长时间
private final static long fsyncWarningThresholdMS;

// 静态属性,确定Logger、预分配空间大小和最长时间
static {
    LOG = LoggerFactory.getLogger(FileTxnLog.class);

    String size = System.getProperty("zookeeper.preAllocSize");
    if (size != null) {
        try {
            preAllocSize = Long.parseLong(size) * 1024;
        } catch (NumberFormatException e) {
            LOG.warn(size + " is not a valid value for preAllocSize");
        }
    }
    fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
}

// 最大(新)的zxid
long lastZxidSeen;
// 存储数据相关的流
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;

// log目录文件
File logDir;

// 是否强制同步
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;

// 数据库id
long dbId;

// 流列表
private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();

// 当前大小
long currentSize;
// 写日志文件
File logFileWrite = null;

}
4.2. 核心函数 

  1. append函数
    public synchronized boolean append(TxnHeader hdr, Record txn)

     throws IOException
    

    {

     if (hdr != null) { // 事务头部不为空
         if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
             LOG.warn("Current zxid " + hdr.getZxid()
                     + " is <= " + lastZxidSeen + " for "
                     + hdr.getType());
         }
         if (logStream==null) { // 日志流为空
            if(LOG.isInfoEnabled()){
                 LOG.info("Creating new log file: log." +  
                         Long.toHexString(hdr.getZxid()));
            }
    
            // 
            logFileWrite = new File(logDir, ("log." + 
                    Long.toHexString(hdr.getZxid())));
            fos = new FileOutputStream(logFileWrite);
            logStream=new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            // 
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
            // 序列化
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            // 刷新到磁盘
            logStream.flush();
    
            // 当前通道的大小
            currentSize = fos.getChannel().position();
            // 添加fos
            streamsToFlush.add(fos);
         }
    
         // 填充文件
         padFile(fos);
    
         // Serializes transaction header and transaction data into a byte buffer.
         // 将事务头和事务数据序列化成Byte Buffer
         byte[] buf = Util.marshallTxnEntry(hdr, txn);
         if (buf == null || buf.length == 0) { // 为空,抛出异常
             throw new IOException("Faulty serialization for header " +
                     "and txn");
         }
         // 生成一个验证算法
         Checksum crc = makeChecksumAlgorithm();
         // Updates the current checksum with the specified array of bytes
         // 使用Byte数组来更新当前的Checksum
         crc.update(buf, 0, buf.length);
         // 写long类型数据
         oa.writeLong(crc.getValue(), "txnEntryCRC");
         // Write the serialized transaction record to the output archive.
         // 将序列化的事务记录写入OutputArchive
         Util.writeTxnBytes(oa, buf);
    
         return true;
     }
     return false;
    

    }
    说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下
      ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false
      ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤
      ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④
      ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤
      ⑤ 填充数据,填充0,进入⑥
      ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦
      ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧
      ⑧ 将更新的ByteBuffer写入磁盘文件,返回true
    append间接调用了padLog函数,其源码如下 
    public static long padLogFile(FileOutputStream f,long currentSize,

                           long preAllocSize) throws IOException{
    

    // 获取位置
    long position = f.getChannel().position();
    if (position + 4096 >= currentSize) { // 计算后是否大于当前大小

     // 重新设置当前大小,剩余部分填充0
     currentSize = currentSize + preAllocSize;
     fill.position(0);
     f.getChannel().write(fill, currentSize-fill.remaining());
    

    }
    return currentSize;
    }
    说明:padLog其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

  2. getLogFiles函数 
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
    // 按照zxid对文件进行排序
    List files = Util.sortDataDir(logDirList, "log", true);
    long logZxid = 0;
    // Find the log file that starts before or at the same time as the
    // zxid of the snapshot
    for (File f : files) { // 遍历文件
     // 从文件中获取zxid
     long fzxid = Util.getZxidFromName(f.getName(), "log");
     if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
         continue;
     }
     // the files
     // are sorted with zxid's
     if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
         logZxid = fzxid;
     }
    
    }
    // 文件列表
    List v=new ArrayList(5);
    for (File f : files) { // 再次遍历文件
     // 从文件中获取zxid
     long fzxid = Util.getZxidFromName(f.getName(), "log");
     if (fzxid < logZxid) { // 跳过小于logZxid的文件
         continue;
     }
     // 添加
     v.add(f);
    
    }
    // 转化成File[] 类型后返回
    return v.toArray(new File[0]);

}
说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。
  ① 对所有log文件按照zxid进行升序排序,进入②
  ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③
  ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④
  ④ 转化后返回
getLogFiles函数调用了sortDataDir,其源码如下
public static List sortDataDir(File[] files, String prefix, boolean ascending)
{
if(files==null)
return new ArrayList(0);
// 转化为列表
List filelist = Arrays.asList(files);
// 进行排序,Comparator是关键,根据zxid进行排序
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
return filelist;
}
说明:getLogFiles其用于排序log文件,可以选择根据zxid进行升序或降序。
getLogFiles函数间接调用了getZxidFromName,其源码如下: 
// 从文件名中解析出zxid
public static long getZxidFromName(String name, String prefix) {
long zxid = -1;
// 对文件名进行分割
String nameParts[] = name.split("\.");
if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
try {
// 转化成长整形
zxid = Long.parseLong(nameParts[1], 16);
} catch (NumberFormatException e) {
}
}
return zxid;
}
说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

相关文章
|
6月前
|
消息中间件 NoSQL Java
SpringCloud
本课程基于黑马程序员2025版SpringCloud实战教学,聚焦高频面试与实际应用,涵盖微服务、RabbitMQ、Redis高级、ElasticSearch等核心内容,结合项目实战与AI模块优化,助力掌握企业级开发技能。资料详尽,尊重原创,侵权必究。
|
6月前
|
Arthas 存储 运维
记Arthas实现一次CPU排查与代码热更新
本文介绍如何使用Arthas排查线上Java应用CPU占用过高问题,结合thread、watch、jad等指令定位阻塞线程与异常代码,实现无需重启服务的热更新修复,并通过profile生成火焰图进行性能分析,提升线上问题排查效率。
|
6月前
|
消息中间件 Java 数据库
异步消息组件MQ高级
本文详解RabbitMQ消息可靠性保障机制,涵盖生产者重试、确认机制(Confirm/Return)、消息持久化及消费可靠性。通过配置重试、回调处理与失败消息表结合定时任务重发,确保消息不丢失,提升系统稳定性。
|
6月前
|
存储 算法 BI
xxljob本地运行
本文介绍XXL-JOB分布式任务调度的使用教程。包含源码获取、服务端部署(数据库导入、配置修改、启动访问)、客户端运行与执行器注册,以及调度任务配置和路由策略详解。通过实际操作演示任务创建、参数设置、手动执行与日志查看,帮助开发者快速掌握XXL-JOB核心功能,实现定时任务的可视化管理和高效调度。(238字)
|
关系型数据库 MySQL 数据安全/隐私保护
|
存储 Kubernetes 监控
etcd:分布式键值存储系统技术
`etcd` 是一个用于共享配置和服务发现的高度可用键值存储系统,基于Raft算法保证数据一致性。它提供HTTP/GRPC API,常用于服务发现、配置共享和分布式锁。etcd集群包含多个节点,每个节点可为领导者或跟随者。在Kubernetes中,etcd存储集群状态,其稳定性和一致性至关重要。维护etcd涉及备份、状态监控、日志审计和安全措施。
859 2
|
存储 缓存 数据安全/隐私保护
说一说你对移动应用中的离线模式的实现。
【4月更文挑战第2天】移动应用的离线模式允许用户在无网情况下仍能部分使用应用,依赖于数据缓存和本地存储。应用在联网时缓存关键数据,离线时从本地读取。数据同步通过延迟策略在重连时完成,敏感信息加密存储并定期备份。开发者还需关注用户体验、性能优化及错误处理,确保离线模式的无缝衔接和稳定性。
1264 1
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
安全 Linux 数据安全/隐私保护
代理ip全局代理是什么且如何设置
代理ip全局代理是什么且如何设置
962 0
|
前端开发 Java 数据库连接
开源一个基于SpringBoot的慈善公益平台(一)
开源一个基于SpringBoot的慈善公益平台
507 0