持久化FileTxnSnapLog

简介: 持久化FileTxnSnapLog

一、前言

  前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

二、FileTxnSnapLog源码分析

2.1 类的属性

public class FileTxnSnapLog {
    //the direcotry containing the 
    //the transaction logs
    // 日志文件目录
    private final File dataDir;
    //the directory containing the
    //the snapshot directory
    // 快照文件目录
    private final File snapDir;
    // 事务日志
    private TxnLog txnLog;
    // 快照
    private SnapShot snapLog;
    // 版本号
    public final static int VERSION = 2;
    // 版本
    public final static String version = "version-";
    // Logger
    private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);
}

说明:类的属性中包含了TxnLog和SnapShot接口,即对FileTxnSnapLog的很多操作都会转发给TxnLog和SnapLog进行操作,这是一种典型的组合方法。

2.2 内部类

  FileTxnSnapLog包含了PlayBackListener内部类,用来接收事务应用过程中的回调,在Zookeeper数据恢复后期,会有事务修正过程,此过程会回调PlayBackListener来进行对应的数据修正。其源码如下 

public interface PlayBackListener {
    void onTxnLoaded(TxnHeader hdr, Record rec);
}

 说明:在完成事务操作后,会调用到onTxnLoaded方法进行相应的处理。

2.3 构造函数

  FileTxnSnapLog的构造函数如下

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
    LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
    // 在datadir和snapdir下生成version-2目录
    this.dataDir = new File(dataDir, version + VERSION);
    this.snapDir = new File(snapDir, version + VERSION);
    if (!this.dataDir.exists()) { // datadir存在但无法创建目录,则抛出异常
        if (!this.dataDir.mkdirs()) {
            throw new IOException("Unable to create data directory "
                                  + this.dataDir);
        }
    }
    if (!this.snapDir.exists()) { // snapdir存在但无法创建目录,则抛出异常
        if (!this.snapDir.mkdirs()) {
            throw new IOException("Unable to create snap directory "
                                  + this.snapDir);
        }
    }
    // 给属性赋值
    txnLog = new FileTxnLog(this.dataDir);
    snapLog = new FileSnap(this.snapDir);
}

说明:对于构造函数而言,其会在传入的datadir和snapdir目录下新生成version-2的目录,并且会判断目录是否创建成功,之后会创建txnLog和snapLog。

2.4 核心函数分析

1. restore函数 

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 根据snap文件反序列化dt和sessions
    snapLog.deserialize(dt, sessions);
    // 
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 获取比最后处理的zxid+1大的log文件的迭代器
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    // 最大的zxid
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // itr在read函数调用后就已经指向第一个合法的事务
            // 获取事务头
            hdr = itr.getHeader();
            if (hdr == null) { // 事务头为空
                // 表示日志文件为空
                return dt.lastProcessedZxid;
            }
            // 事务头的zxid小于snapshot中的最大zxid并且其不为0,则会报错,因为上面获取的是+1
            if (hdr.getZxid() < highestZxid && highestZxid != 0) { 
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                          new Object[] { highestZxid, hdr.getZxid(),
                                        hdr.getType() });
            } else { 
                // 重新赋值highestZxid
                highestZxid = hdr.getZxid();
            }
            try {
                // 在datatree上处理事务
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
                throw new IOException("Failed to process transaction type: " +
                                      hdr.getType() + " error: " + e.getMessage(), e);
            }
            // 每处理完一个事务都会进行回调
            listener.onTxnLoaded(hdr, itr.getTxn());
            if (!itr.next()) // 已无事务,跳出循环
                break;
        }
    } finally {
        if (itr != null) { // 迭代器不为空,则关闭
            itr.close();
        }
    }
    // 返回最高的zxid
    return highestZxid;
}

说明:restore用于恢复datatree和sessions,其步骤大致如下

  ① 根据snapshot文件反序列化datatree和sessions,进入②

  ② 获取比snapshot文件中的zxid+1大的log文件的迭代器,以对log文件中的事务进行迭代,进入③

  ③ 迭代log文件的每个事务,并且将该事务应用在datatree中,同时会调用onTxnLoaded函数进行后续处理,进入④

  ④ 关闭迭代器,返回log文件中最后一个事务的zxid(作为最高的zxid)

其中会调用到FileTxnLog的read函数,read函数在FileTxnLog中已经进行过分析,会调用processTransaction函数,其源码如下 

public void processTransaction(TxnHeader hdr,DataTree dt,
                               Map<Long, Integer> sessions, Record txn)
    throws KeeperException.NoNodeException {
    // 事务处理结果
    ProcessTxnResult rc;
    switch (hdr.getType()) { // 确定事务类型
        case OpCode.createSession: // 创建会话
            // 添加进会话
            sessions.put(hdr.getClientId(),
                         ((CreateSessionTxn) txn).getTimeOut());
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                         "playLog --- create session in log: 0x"
                                         + Long.toHexString(hdr.getClientId())
                                         + " with timeout: "
                                         + ((CreateSessionTxn) txn).getTimeOut());
            }
            // give dataTree a chance to sync its lastProcessedZxid
            // 处理事务
            rc = dt.processTxn(hdr, txn);
            break;
        case OpCode.closeSession: // 关闭会话
            // 会话中移除
            sessions.remove(hdr.getClientId());
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                         "playLog --- close session in log: 0x"
                                         + Long.toHexString(hdr.getClientId()));
            }
            // 处理事务
            rc = dt.processTxn(hdr, txn);
            break;
        default:
            // 处理事务
            rc = dt.processTxn(hdr, txn);
    }
    /**
         * Snapshots are lazily created. So when a snapshot is in progress,
         * there is a chance for later transactions to make into the
         * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
         * errors could occur. It should be safe to ignore these.
         */
    if (rc.err != Code.OK.intValue()) { // 忽略处理结果中可能出现的错误
        LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()
                  + ", error: " + rc.err + ", path: " + rc.path);
    }
}

说明:processTransaction会根据事务头中记录的事务类型(createSession、closeSession、其他类型)来进行相应的操作,对于createSession类型而言,其会将会话和超时时间添加至会话map中,对于closeSession而言,会话map会根据客户端的id号删除其会话(因为存储时候key=clientId),同时,所有的操作都会调用到dt.processTxn函数,其源码如下 

public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
    // 事务处理结果
    ProcessTxnResult rc = new ProcessTxnResult();
    try {
        // 从事务头中解析出相应属性并保存至rc中
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) { // 确定事务类型
            case OpCode.create: // 创建结点
                // 显示转化
                CreateTxn createTxn = (CreateTxn) txn;
                // 获取创建结点路径
                rc.path = createTxn.getPath();
                // 创建结点
                createNode(
                    createTxn.getPath(),
                    createTxn.getData(),
                    createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0,
                    createTxn.getParentCVersion(),
                    header.getZxid(), header.getTime());
                break;
            case OpCode.delete: // 删除结点
                // 显示转化
                DeleteTxn deleteTxn = (DeleteTxn) txn;
                // 获取删除结点路径
                rc.path = deleteTxn.getPath();
                // 删除结点
                deleteNode(deleteTxn.getPath(), header.getZxid());
                break;
            case OpCode.setData: // 写入数据
                // 显示转化
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                // 获取写入数据结点路径
                rc.path = setDataTxn.getPath();
                // 写入数据
                rc.stat = setData(setDataTxn.getPath(), setDataTxn
                                  .getData(), setDataTxn.getVersion(), header
                                  .getZxid(), header.getTime());
                break;
            case OpCode.setACL: // 设置ACL
                // 显示转化
                SetACLTxn setACLTxn = (SetACLTxn) txn;
                // 获取路径
                rc.path = setACLTxn.getPath();
                // 设置ACL
                rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
                                 setACLTxn.getVersion());
                break;
            case OpCode.closeSession: // 关闭会话
                // 关闭会话
                killSession(header.getClientId(), header.getZxid());
                break;
            case OpCode.error: // 错误
                // 显示转化
                ErrorTxn errTxn = (ErrorTxn) txn;
                // 记录错误
                rc.err = errTxn.getErr();
                break;
            case OpCode.check: // 检查
                // 显示转化
                CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
                // 获取路径
                rc.path = checkTxn.getPath();
                break;
            case OpCode.multi: // 多个事务
                // 显示转化
                MultiTxn multiTxn = (MultiTxn) txn ;
                // 获取事务列表
                List<Txn> txns = multiTxn.getTxns();
                rc.multiResult = new ArrayList<ProcessTxnResult>();
                boolean failed = false;
                for (Txn subtxn : txns) { // 遍历事务列表
                    if (subtxn.getType() == OpCode.error) {
                        failed = true;
                        break;
                    }
                }
                boolean post_failed = false;
                for (Txn subtxn : txns) { // 遍历事务列表,确定每个事务类型并进行相应操作
                    // 处理事务的数据
                    ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
                    Record record = null;
                    switch (subtxn.getType()) {
                        case OpCode.create:
                            record = new CreateTxn();
                            break;
                        case OpCode.delete:
                            record = new DeleteTxn();
                            break;
                        case OpCode.setData:
                            record = new SetDataTxn();
                            break;
                        case OpCode.error:
                            record = new ErrorTxn();
                            post_failed = true;
                            break;
                        case OpCode.check:
                            record = new CheckVersionTxn();
                            break;
                        default:
                            throw new IOException("Invalid type of op: " + subtxn.getType());
                    }
                    assert(record != null);
                    // 将bytebuffer转化为record(初始化record的相关属性)
                    ByteBufferInputStream.byteBuffer2Record(bb, record);
                    if (failed && subtxn.getType() != OpCode.error){ // 失败并且不为error类型
                        int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() 
                            : Code.OK.intValue();
                        subtxn.setType(OpCode.error);
                        record = new ErrorTxn(ec);
                    }
                    if (failed) { // 失败
                        assert(subtxn.getType() == OpCode.error) ;
                    }
                    // 生成事务头
                    TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
                                                     header.getZxid(), header.getTime(), 
                                                     subtxn.getType());
                    // 递归调用处理事务
                    ProcessTxnResult subRc = processTxn(subHdr, record);
                    // 保存处理结果
                    rc.multiResult.add(subRc);
                    if (subRc.err != 0 && rc.err == 0) {
                        rc.err = subRc.err ;
                    }
                }
                break;
        }
    } catch (KeeperException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Failed: " + header + ":" + txn, e);
        }
        rc.err = e.code().intValue();
    } catch (IOException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Failed: " + header + ":" + txn, e);
        }
    }
    /*
         * A snapshot might be in progress while we are modifying the data
         * tree. If we set lastProcessedZxid prior to making corresponding
         * change to the tree, then the zxid associated with the snapshot
         * file will be ahead of its contents. Thus, while restoring from
         * the snapshot, the restore method will not apply the transaction
         * for zxid associated with the snapshot file, since the restore
         * method assumes that transaction to be present in the snapshot.
         *
         * To avoid this, we first apply the transaction and then modify
         * lastProcessedZxid.  During restore, we correctly handle the
         * case where the snapshot contains data ahead of the zxid associated
         * with the file.
         */
    // 事务处理结果中保存的zxid大于已经被处理的最大的zxid,则重新赋值
    if (rc.zxid > lastProcessedZxid) {
        lastProcessedZxid = rc.zxid;
    }
    /*
         * Snapshots are taken lazily. It can happen that the child
         * znodes of a parent are created after the parent
         * is serialized. Therefore, while replaying logs during restore, a
         * create might fail because the node was already
         * created.
         *
         * After seeing this failure, we should increment
         * the cversion of the parent znode since the parent was serialized
         * before its children.
         *
         * Note, such failures on DT should be seen only during
         * restore.
         */
    if (header.getType() == OpCode.create &&
        rc.err == Code.NODEEXISTS.intValue()) { // 处理在恢复数据过程中的结点创建操作
        LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +
                  " path:" + rc.path + " err: " + rc.err);
        int lastSlash = rc.path.lastIndexOf('/');
        String parentName = rc.path.substring(0, lastSlash);
        CreateTxn cTxn = (CreateTxn)txn;
        try {
            setCversionPzxid(parentName, cTxn.getParentCVersion(),
                             header.getZxid());
        } catch (KeeperException.NoNodeException e) {
            LOG.error("Failed to set parent cversion for: " +
                      parentName, e);
            rc.err = e.code().intValue();
        }
    } else if (rc.err != Code.OK.intValue()) {
        LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +
                  " : error: " + rc.err);
    }
    return rc;
}

说明:processTxn用于处理事务,即将事务操作应用到DataTree【参见下面源码】内存数据库中,以恢复成最新的数据。其操作所有数据节点都是借助于DataNode,数据会类似于树状存储,

以删除为例:

public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    DataNode node = nodes.get(path);
    if (node == null) {
        throw new KeeperException.NoNodeException();
    }
    nodes.remove(path);
    DataNode parent = nodes.get(parentName);
    if (parent == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (parent) {
        parent.removeChild(childName);
        parent.stat.setPzxid(zxid);
        long eowner = node.stat.getEphemeralOwner();
        if (eowner != 0) {
            HashSet<String> nodes = ephemerals.get(eowner);
            if (nodes != null) {
                synchronized (nodes) {
                    nodes.remove(path);
                }
            }
        }
        node.parent = null;
    }
    ......
}

2. save函数  

public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
    throws IOException {
    // 获取最后处理的zxid
    long lastZxid = dataTree.lastProcessedZxid;
    // 生成snapshot文件
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
    // 序列化datatree、sessionsWithTimeouts至snapshot文件
    snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
}

说明:save函数用于将sessions和datatree保存至snapshot文件中,其大致步骤如下

  ① 获取内存数据库中已经处理的最新的zxid,进入②

  ② 根据zxid和快照目录生成snapshot文件,进入③

  ③ 将datatree(内存数据库)、sessionsWithTimeouts序列化至快照文件。

  其他的函数或多或少都是调用TxnLog和SnapLog中的相应函数,之前已经进行过分析,这里不再累赘。

三、总结

本篇博文分析了FileTxnSnapLog的源码,其主要:

  • 封装了TxnLog和SnapLog来进行相应的处理
  • 提供了从snapshot文件和log文件中恢复内存数据库的接口
  • 其对于事务的维护,或说事务的存储结构是一个个DataTree,代码层面看到的是DataNode,其增删都是追加/删除父子节点,类似于树状操作
目录
相关文章
|
双11
营销互动保障 - 捉猫猫在未知中前行
在云栖TechDay 活动第二十七期,淘宝移动平台资深无线开发工程师张升华(煮旺)详细讲解了双十一捉猫猫项目实战经验,在长达16天的营销互动中,开发团队面临了很多难点,在未知中前行;在处理这些问题,保障营销活动顺利进行的过程中也收获了大量的宝贵经验。分享内容十分精彩,不容错过。
4977 0
|
3天前
|
人工智能 自然语言处理 文字识别
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
Qwen3.7-Max是阿里云百炼面向智能体时代推出的新一代旗舰模型,对标GPT-5.5、Claude Opus 4.7等闭源旗舰。该模型支持百万级token上下文窗口,具备顶级推理能力、多模态搜索与视觉理解增强、流式输出低延迟响应等核心优势,覆盖编程、办公、长周期自主执行等复杂场景。同时支持OpenAI接口兼容,便于系统快速迁移。用户可通过Token Plan团队或节省计划等订阅方式灵活调用,适合企业级高要求场景使用。
8196 37
阿里云百炼Qwen3.7-Max简介:能力、优势、支持订阅计划参考
|
3天前
|
人工智能 运维 JavaScript
阿里云Qoder CN(原通义灵码)全解析 产品形态、版本划分与技术适配说明
在AI辅助开发与智能办公工具持续普及的当下,阿里云旗下原通义灵码正式更名为Qoder CN,同时延伸出QoderWork CN、Qoder CN CLI、Qoder CN Mobile等多款配套产品,形成覆盖代码开发、日常办公、终端交互、移动端使用的完整工具矩阵。Qoder CN核心定位为AI智能编码助手,深度适配主流代码编辑器、集成开发环境以及终端场景;QoderWork CN则偏向桌面端综合办公辅助,二者面向不同使用场景,划分了多个版本档位,搭配差异化资源配额、功能权限与计费规则,同时兼容多款主流大模型。
553 4
|
3天前
|
JavaScript 定位技术 API
CodeGraph 爆火:编程 Agent 需要的不是更多上下文,而是一张提前画好的代码地图
CodeGraph 是一款爆火的本地代码智能工具,通过 tree-sitter 解析 AST 构建结构化知识图谱(存于 SQLite),为编程 Agent 提前生成“代码地图”。它显著降低 Agent 在中大型项目中的探索成本——实测工具调用减少71%、Token 降57%、速度提升46%,支持19+语言及主流框架路由识别,完全离线、无需 API Key。
514 3
CodeGraph 爆火:编程 Agent 需要的不是更多上下文,而是一张提前画好的代码地图
|
2天前
|
缓存 测试技术 API
Qwen 3.7 Plus 与 Max 实测:性价比与多模态能力差异解析(2026)
2026 年 6 月 1 日,阿里悄无声息地发布了 Qwen 3.7 Plus,距 Qwen 3.7 Max 上线刚好 11 天。同样的 1M 上下文,同样的 35 小时自治上限。但价格才是头条:Plus 是 0.40/M输入,Max是 2.50/M——便宜约 6 倍——并且还能看图、看视频。Vision Arena 上 Plus 已经排到 #16。所以这周真正值得讨论的问题不是”要不要为视觉能力买单”,而是”Max 凭什么用 6 倍价格换来 2 个百分点的 benchmark 领先”。
|
3天前
|
数据采集 人工智能 前端开发
让 Coding Agent 从黑盒到透明:阿里云 Agent 观测审计数据采集实践
AI Agent 规模化落地带来执行黑盒、行为难追溯、成本难度量三大难题。阿里云基于 OTel 标准,面向 Coding Agent、个人通用助理和框架型 Agent,推出 LoongSuite Pilot、插件及探针等无侵入采集方案,让 Agent 实现可看见、可分析、可审计、可治理。
690 149
|
3天前
|
人工智能 缓存 自然语言处理
阿里Qwen3.7-Max评测:Agent能力显著提升,耗时与调用成本大幅下降
阿里云百炼推出面向智能体的旗舰大模型Qwen3.7-Max,具备长周期自主执行能力,显著提升编程、办公自动化等复杂任务处理水平;支持MCP集成与多框架兼容,并以限时5折+100万Tokens免费试用大幅降低使用门槛,助力企业高效落地AI应用。在阿里云百炼平台快速体验:https://t.aliyun.com/U/fPVHqY
1920 10
|
3天前
|
存储 安全 Java
AgentScope Java 2.0:打造分布式、企业级智能体底座
AgentScope 2.0 面向分布式部署、稳定运行、权限安全等企业级需求全面升级,打造支持多租户隔离与长期稳定运行的企业级智能体底座。
|
3天前
|
人工智能 安全 定位技术
CodeGraph深度解析 让Claude Code工具调用直降七成的核心原理与实操教程
如今以Claude Code为代表的AI编程智能体已经成为开发者日常编码、项目重构、漏洞修复的必备工具。但在长期使用过程中,几乎所有开发者都会遇到同一个明显痛点:AI虽然具备强大的代码生成与分析能力,却常常陷入盲目探索的循环中。
1321 2
|
3天前
|
人工智能 运维 API
2026年阿里云百炼通义千问Qwen3.7-plus深度介绍 功能特性、使用优势及618大促订阅方案指南
大模型技术的普及,让AI能力逐步融入个人办公、内容创作、代码编写、企业运营、教育培训等各类场景。不同定位的模型对应不同使用需求,旗舰级模型性能强劲但使用成本偏高,轻量化模型价格低廉却难以胜任复杂任务,而介于两者之间的中端主力模型,凭借均衡的能力、亲民的定价、广泛的场景适配性,成为绝大多数个人用户、小型团队、中小企业的首选。
681 1

热门文章

最新文章