深入浅出Zookeeper源码(二):存储技术

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: 在上篇文章中,我们简单提到了Zookeeper的几个核心点。在这篇文章中,我们就来探索其存储技术。在开始前,读者可以考虑思考下列问题:- Zookeeper的数据存储是如何实现的?- Zookeeper进行一次写操作的时候,会发生什么å?- 当一个Zookeeper新加入现有集群时,如何同步现集群中的数据?
版本 日期 备注
1.0 2020.3.12 文章首发
1.0.1 2020.3.16 改进部分大小写问题及形容方式
1.0.2 2020.3.21 改进可能会引起错误理解的部分
1.0.3 2020.3.29 修改标题
1.0.4 2020.4.18 改进小结部分
1.0.5 2020.6.26 更新部分部分解释,改进注释风格
1.0.6 2020.7.6 增加部分详细解释

前言

在上篇文章中,我们简单提到了Zookeeper的几个核心点。在这篇文章中,我们就来探索其存储技术。在开始前,读者可以考虑思考下列问题:

  • Zookeeper的数据存储是如何实现的?
  • Zookeeper进行一次写操作的时候,会发生什么å?
  • 当一个Zookeeper新加入现有集群时,如何同步现集群中的数据?

抱着问题,我们进入下面的内容。

Zookeper本地存储模型

众所周知,Zookeeper不擅长大量数据的读写,因为:

  1. 本质上就是一个内存里的字典。
  2. 持久化节点的写入由于WAL会导致刷盘,过大的数据会引起额外的seek
  3. 同样的,在zk启动时,所有的数据会从WAL的日志中读出。如果过大,也会导致启动时间较长。

而内存中的数据,也被称为ZkDatabase(Zk的内存数据库),由它来负责管理Zk的会话DataTree存储和事务日志,它也会定时向磁盘dump快照数据,在Zk启动时,也会通过事务日志和快照数据来恢复内存中的数据。

既然Zk的数据是在内存里的,那么它是如何解决数据持久化问题的呢?上一段我们已经提到了:即通过事务日志——WAL,在每次写请求前,都会根据目前的zxid来写log,将请求先记录到日志中。

接下来,我们来谈谈WAL的优化措施。

WAL的优化

WAL优化方案1:Group Commit

一般的WAL中每次写完END都要调用一次耗时的sync API,这其实是会影响到系统的性能。为了解决这个问题,我们可以一次提交多个数据写入——只在最后一个数据写入的END日志之后,才调用sync API。like this:

  • without group commit: BEGIN Data1 END Sync BEGIN Data2 END Sync BEGIN Data3 END Sync
  • with group commit: BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync

凡事都有代价,这可能会引起数据一致性相关的问题。

WAL优化方案2:File Padding

在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置进行日志追加,这些都是发生在写事务日志时,这会明显拖慢系统的性能。

为了减少这些 seek,我们可以预先为 WAL 分配 block。例如 ZooKeeper 当检测到当前事务日志文件不足4KB时,就会填充0使该文件到64MB(这里0仅仅作为填充位)。并新建一个64MB的文件。

所以这也是Zookeeper不擅长读写大数据的原因之一,这会引起大量的block分配。

WAL优化方案3:Snapshot

如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。

除了解决启动时间过长的问题之外,快照还可以减少存储空间的使用。WAL 的多个日志条目有可能是对同一个数据的改动,通过快照,就可以只保留最新的数据改动(Merge)。

Zk的确采用了这个方案来做优化。还带来的一个好处是:在一个节点加入时,可以用最新的Snapshot传过去便于同步数据。

源码解析

本节内容都以3.5.7版本为例

核心接口和类

  • TxnLog:接口类型,提供读写事务日志的API。
  • FileTxnLog:基于文件的TxnLog实现。
  • Snapshot:快照接口类型,提供序列化、反序列化、访问快照API。
  • FileSnapshot:基于文件的Snapshot实现。
  • FileTxnSnapLog:TxnLog和Snapshot的封装
  • DataTree:Zookeeper的内存数据结构,ZNode构成的树。
  • DataNode:表示一个ZNode。

TxnLog

TxnLog是我们前面提到的事务日志。那么接下来我们就来看它的相关源码。

先看注释:

package org.apache.zookeeper.server.persistence;

import ...

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {
   

在注释中,我们可以看到一个FileLog由三部分组成:

  • FileHeader
  • TxnList
  • ZerdPad

关于FileHeader,可以理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。

TxnLog.append

我们来看看最典型的append方法,可以将其理解WAL过程中的核心方法:

    /**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
   
        if (hdr == null) {
    //为null意味着这是一个读请求,直接返回
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
   
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
   
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream==null) {
    //为空的话则new一个Stream
           if(LOG.isInfoEnabled()){
   
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");   //写file header
           // Make sure that the magic number is written before padding.
           logStream.flush();      // zxid必须比日志先落盘
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos); //加入需要Flush的队列
        }
        filePadding.padFile(fos.getChannel());   //确定是否要扩容。每次64m扩容
        byte[] buf = Util.marshallTxnEntry(hdr, txn);  //序列化写入
        if (buf == null || buf.length == 0) {
   
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();   //生成butyArray的checkSum
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里
        Util.writeTxnBytes(oa, buf);
        return true;
    }

这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,如果zxid1小于zxid2,则zxid1在zxid2之前发生。

简单分析一下写入过程:

  1. 确定要写的事务日志:当Zk启动完成或日志写满时,会与日志文件断开连接。这个时候会根据zxid创建一个日志。
  2. 是否需要预分配:如果检测到当前日志剩余空间不足4KB时
  3. 事务序列化
  4. 为每个事务生成一个Checksum,目的是为了校验数据的完整性和一致性。
  5. 写入文件,不过是写在Buffer里,并未落盘。
  6. 落盘。根据用户配置来决定是否强制落盘。

TxnLog.commit

这个方法被调用的时机大致有:

  • 服务端比较闲的时候去调用
  • 到请求数量超出1000时,调用。之前提到过GroupCommit,其实就是在这个时候调用的。
  • zk的shutdown钩子被调用时,调用
    /**
     * commit the logs. make sure that everything hits the
     * disk
     */
    public synchronized void commit() throws IOException {
   
        if (logStream != null) {
   
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
   
            log.flush();
            if (forceSync) {
   
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);//对应fdataSync

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
   
                    if(serverStats != null) {
   
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
   
            streamsToFlush.removeFirst().close();
        }
    }

代码非常的简单。如果logStream还有,那就先刷下去。然后遍历待flush的队列(是个链表,用来保持操作顺序),同时还会关注写入的时间,如果过长,则会打一个Warn的日志。

DataTree和DataNode

DataTree是Zk的内存数据结构——就是我们之前说到的MTable。它以树状结构来组织DataNode。

这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。

public class DataNode implements Record {
   
    /** the data for this datanode */
    byte data[];

    /**
     * the acl map long for this datanode. the datatree has the map
     */
    Long acl;

    /**
     * the stat for this node that is persisted to disk.
     */
    public StatPersisted stat;

    /**
     * the list of children for this node. note that the list of children string
     * does not contain the parent path -- just the last part of the path. This
     * should be synchronized on except deserializing (for speed up issues).
     */
    private Set<String> children = null;
.....
}

如果用过ZkClient的小伙伴,可能非常熟悉。这就是我们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。

简单了解DataNode后,我们来看一下DataTree。为了避免干扰,我们选出最关键的成员变量:

public class DataTree {
   
    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();
    .......
}

我们可以看到,DataTree本质上是通过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。

那为什么要保存两个状态呢?这得看调用它们被调用的场景:

  • 一般CRUD ZNode的请求都是走ConcurrentHashMap的
  • 序列化DataTree的时候会从Root节点开始遍历所有节点

如果需要获取所有节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。

接下来看一下序列化的相关代码:

DataNode的序列化方法

    /**
     * this method uses a stringbuilder to create a new path for children. This
     * is faster than string appends ( str1 + str2).
     *
     * @param oa
     *            OutputArchive to write to.
     * @param path
     *            a string builder.
     * @throws IOException
     * @throws InterruptedException
     */
    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
   
        String pathString = path.toString();
        DataNode node = getNode(pathString);
        if (node == null) {
   
            return;
        }
        String children[] = null;
        DataNode nodeCopy;
        synchronized (node) {
   
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            //we do not need to make a copy of node.data because the contents
            //are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            Set<String> childs = node.getChildren();
            children = childs.toArray(new String[childs.size()]);
        }
        serializeNodeData(oa, pathString, nodeCopy);
        path.append('/');
        int off = path.length();
        for (String child : children) {
   
            // since this is single buffer being resused
            // we need
            // to truncate the previous bytes of string.
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }

可以看到,的确是通过DataNode的Children来遍历所有节点。

DataNode的反序列化方法

接下来看一下反序列化的代码:

    public void deserialize(InputArchive ia, String tag) throws IOException {
   
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!"/".equals(path)) {
   
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
   
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
   
                root = node;
            } else {
   
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
   
                    throw new IOException("Invalid Datatree, unable to find " +
                            "parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
   
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
   
                    ttls.add(path);
                } else if (eowner != 0) {
   
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
   
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);
        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();

        aclCache.purgeUnused();
    }

因为序列化的时候是前序遍历。所以反序列化时是先反序列化父亲节点,再反序列化孩子节点。

Snapshot

那么DataTree在什么情况下会序列化呢?在这里就要提到快照了。

前面提到过:如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。

除了减少WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个一般叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。

序列化

接下来看一下代码入口:

    /**
     * serialize the datatree and session into the file snapshot
     * @param dt the datatree to be serialized
     * @param sessions the sessions to be serialized
     * @param snapShot the file to store snapshot into
     */
    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
   
        if (!close) {
   
            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
   
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
            }
        } else {
   
            throw new IOException("FileSnap has already been closed");
        }
    }

JavaIO的基础知识在这不再介绍,有兴趣的人可以自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口

本质就是创建文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。

反序列化

    /**
     * deserialize a data tree from the most recent snapshot
     * @return the zxid of the snapshot
     */
    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
   
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
   
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
   
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
   
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
   
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
   
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
   
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

简单来说,先读取Snapshot文件们。并反序列化它们,组成DataTree。

小结

在本文中,笔者和大家一起学习了Zk的底层存储技术。在此处,我们做个简单的回顾:

  • zk的数据主要维护在内存中。在写入内存前,会做WAL,同时也会定期的做快照持久化到磁盘
  • WAL的常见优化手段有三种:Group Commit、File Padding、Snapshot

另外,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5月前
|
存储 负载均衡 算法
深入浅出Zookeeper源码(七):Leader选举
对于一个分布式集群来说,保证数据写入一致性最简单的方式就是依靠一个节点来调度和管理其他节点。在分布式系统中我们一般称其为Leader。
170 6
|
4月前
|
Apache
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
Apache ZooKeeper - 构建ZooKeeper源码环境及StandAlone模式下的服务端和客户端启动
46 2
|
5月前
|
存储 设计模式 算法
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
137 1
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
|
5月前
|
存储 设计模式 算法
深入浅出Zookeeper源码(六):客户端的请求在服务器中经历了什么
当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。
117 0
|
5月前
|
Java API 开发者
深入浅出Zookeeper源码(四):Watch实现剖析
用过zookeeper的同学都知道watch是一个非常好用的机制,今天我们就来看看它的实现原理。 在正文开始前,我们先来简单回忆一下watch是什么? zk提供了分布式数据的发布/订阅功能——即典型的发布订阅模型,其定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态变化时,则会通知所有订阅者。具体来说,则是zk允许一个客户端向服务端注册一个watch监听,当服务端的一些指定事件触发了这个watch,那么就会向该客户端发送事件通知。
91 0
|
5月前
|
网络协议 数据库
深入浅出Zookeeper源码(五):BadVersionException到底是怎么一回事
最近在开发时偶尔会观测到zk报出`BadVersionException`,后在搜索引起上得知了是乐观锁相关的问题,很快就解决了问题。不过学而不思则罔:无论是单体应用还是分布式系统,在运行过程中总要有一种**机制**来保证数据排他性。接下来,我们就来看看zk是如何实现这种**机制**的。
103 1
|
5月前
|
网络协议 数据库
深入浅出Zookeeper源码(三):会话管理
我们知道zookeeper是一个分布式协同系统。在一个大型的分布式系统中,必然会有大量的client来连接zookeeper。那么zookeeper是如何管理这些session的生命周期呢?带着这个问题,我们进入今天的正文。
104 1
|
5月前
|
消息中间件 Java Shell
Linux【脚本 03】shell脚本离线安装配置集结JDK+InfluxDB+Zookeeper+Kafka(安装文件及脚本源码网盘分享)
Linux【脚本 03】shell脚本离线安装配置集结JDK+InfluxDB+Zookeeper+Kafka(安装文件及脚本源码网盘分享)
29 0
|
11月前
|
Java Apache Maven
Zookeeper源码在本地编译启动
Zookeeper源码在本地编译启动
88 0
|
12月前
|
算法 网络协议 Apache
Apache ZooKeeper - 选举Leader源码流程深度解析
Apache ZooKeeper - 选举Leader源码流程深度解析
94 0