45-微服务技术栈(高级):分布式协调服务zookeeper源码篇(持久化FileTxnSnapLog)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

一、前言

  前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

二、FileTxnSnapLog源码分析

2.1 类的属性

public class FileTxnSnapLog {

   //the direcotry containing the

   //the transaction logs

   // 日志文件目录

   private final File dataDir;

   //the directory containing the

   //the snapshot directory

   // 快照文件目录

   private final File snapDir;

   // 事务日志

   private TxnLog txnLog;

   // 快照

   private SnapShot snapLog;

   // 版本号

   public final static int VERSION = 2;

   // 版本

   public final static String version = "version-";


   // Logger

   private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);

}

说明:类的属性中包含了TxnLog和SnapShot接口,即对FileTxnSnapLog的很多操作都会转发给TxnLog和SnapLog进行操作,这是一种典型的组合方法。

2.2 内部类

  FileTxnSnapLog包含了PlayBackListener内部类,用来接收事务应用过程中的回调,在Zookeeper数据恢复后期,会有事务修正过程,此过程会回调PlayBackListener来进行对应的数据修正。其源码如下 

public interface PlayBackListener {

   void onTxnLoaded(TxnHeader hdr, Record rec);

}

 说明:在完成事务操作后,会调用到onTxnLoaded方法进行相应的处理。

2.3 构造函数

  FileTxnSnapLog的构造函数如下

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {

   LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

   // 在datadir和snapdir下生成version-2目录

   this.dataDir = new File(dataDir, version + VERSION);

   this.snapDir = new File(snapDir, version + VERSION);

   if (!this.dataDir.exists()) { // datadir存在但无法创建目录,则抛出异常

       if (!this.dataDir.mkdirs()) {

           throw new IOException("Unable to create data directory "

                                 + this.dataDir);

       }

   }

   if (!this.snapDir.exists()) { // snapdir存在但无法创建目录,则抛出异常

       if (!this.snapDir.mkdirs()) {

           throw new IOException("Unable to create snap directory "

                                 + this.snapDir);

       }

   }

   // 给属性赋值

   txnLog = new FileTxnLog(this.dataDir);

   snapLog = new FileSnap(this.snapDir);

}

说明:对于构造函数而言,其会在传入的datadir和snapdir目录下新生成version-2的目录,并且会判断目录是否创建成功,之后会创建txnLog和snapLog。

2.4 核心函数分析

1. restore函数 

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {

   // 根据snap文件反序列化dt和sessions

   snapLog.deserialize(dt, sessions);

   //

   FileTxnLog txnLog = new FileTxnLog(dataDir);

   // 获取比最后处理的zxid+1大的log文件的迭代器

   TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);

   // 最大的zxid

   long highestZxid = dt.lastProcessedZxid;


   TxnHeader hdr;

   try {

       while (true) {

           // itr在read函数调用后就已经指向第一个合法的事务

           // 获取事务头

           hdr = itr.getHeader();

           if (hdr == null) { // 事务头为空

               // 表示日志文件为空

               return dt.lastProcessedZxid;

           }

           // 事务头的zxid小于snapshot中的最大zxid并且其不为0,则会报错,因为上面获取的是+1

           if (hdr.getZxid() < highestZxid && highestZxid != 0) {

               LOG.error("{}(higestZxid) > {}(next log) for type {}",

                         new Object[] { highestZxid, hdr.getZxid(),

                                       hdr.getType() });

           } else {

               // 重新赋值highestZxid

               highestZxid = hdr.getZxid();

           }

           try {

               // 在datatree上处理事务

               processTransaction(hdr,dt,sessions, itr.getTxn());

           } catch(KeeperException.NoNodeException e) {

               throw new IOException("Failed to process transaction type: " +

                                     hdr.getType() + " error: " + e.getMessage(), e);

           }

           // 每处理完一个事务都会进行回调

           listener.onTxnLoaded(hdr, itr.getTxn());

           if (!itr.next()) // 已无事务,跳出循环

               break;

       }

   } finally {

       if (itr != null) { // 迭代器不为空,则关闭

           itr.close();

       }

   }

   // 返回最高的zxid

   return highestZxid;

}

说明:restore用于恢复datatree和sessions,其步骤大致如下

  ① 根据snapshot文件反序列化datatree和sessions,进入②

  ② 获取比snapshot文件中的zxid+1大的log文件的迭代器,以对log文件中的事务进行迭代,进入③

  ③ 迭代log文件的每个事务,并且将该事务应用在datatree中,同时会调用onTxnLoaded函数进行后续处理,进入④

  ④ 关闭迭代器,返回log文件中最后一个事务的zxid(作为最高的zxid)

其中会调用到FileTxnLog的read函数,read函数在FileTxnLog中已经进行过分析,会调用processTransaction函数,其源码如下 

public void processTransaction(TxnHeader hdr,DataTree dt,

                              Map<Long, Integer> sessions, Record txn)

   throws KeeperException.NoNodeException {

   // 事务处理结果

   ProcessTxnResult rc;

   switch (hdr.getType()) { // 确定事务类型

       case OpCode.createSession: // 创建会话

           // 添加进会话

           sessions.put(hdr.getClientId(),

                        ((CreateSessionTxn) txn).getTimeOut());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- create session in log: 0x"

                                        + Long.toHexString(hdr.getClientId())

                                        + " with timeout: "

                                        + ((CreateSessionTxn) txn).getTimeOut());

           }

           // give dataTree a chance to sync its lastProcessedZxid

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       case OpCode.closeSession: // 关闭会话

           // 会话中移除

           sessions.remove(hdr.getClientId());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- close session in log: 0x"

                                        + Long.toHexString(hdr.getClientId()));

           }

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       default:

           // 处理事务

           rc = dt.processTxn(hdr, txn);

   }


   /**

        * Snapshots are lazily created. So when a snapshot is in progress,

        * there is a chance for later transactions to make into the

        * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS

        * errors could occur. It should be safe to ignore these.

        */

   if (rc.err != Code.OK.intValue()) { // 忽略处理结果中可能出现的错误

       LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()

                 + ", error: " + rc.err + ", path: " + rc.path);

   }

}

说明:processTransaction会根据事务头中记录的事务类型(createSession、closeSession、其他类型)来进行相应的操作,对于createSession类型而言,其会将会话和超时时间添加至会话map中,对于closeSession而言,会话map会根据客户端的id号删除其会话(因为存储时候key=clientId),同时,所有的操作都会调用到dt.processTxn函数,其源码如下 

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

{

   // 事务处理结果

   ProcessTxnResult rc = new ProcessTxnResult();

   try {

       // 从事务头中解析出相应属性并保存至rc中

       rc.clientId = header.getClientId();

       rc.cxid = header.getCxid();

       rc.zxid = header.getZxid();

       rc.type = header.getType();

       rc.err = 0;

       rc.multiResult = null;

       switch (header.getType()) { // 确定事务类型

           case OpCode.create: // 创建结点

               // 显示转化

               CreateTxn createTxn = (CreateTxn) txn;

               // 获取创建结点路径

               rc.path = createTxn.getPath();

               // 创建结点

               createNode(

                   createTxn.getPath(),

                   createTxn.getData(),

                   createTxn.getAcl(),

                   createTxn.getEphemeral() ? header.getClientId() : 0,

                   createTxn.getParentCVersion(),

                   header.getZxid(), header.getTime());

               break;

           case OpCode.delete: // 删除结点

               // 显示转化

               DeleteTxn deleteTxn = (DeleteTxn) txn;

               // 获取删除结点路径

               rc.path = deleteTxn.getPath();

               // 删除结点

               deleteNode(deleteTxn.getPath(), header.getZxid());

               break;

           case OpCode.setData: // 写入数据

               // 显示转化

               SetDataTxn setDataTxn = (SetDataTxn) txn;

               // 获取写入数据结点路径

               rc.path = setDataTxn.getPath();

               // 写入数据

               rc.stat = setData(setDataTxn.getPath(), setDataTxn

                                 .getData(), setDataTxn.getVersion(), header

                                 .getZxid(), header.getTime());

               break;

           case OpCode.setACL: // 设置ACL

               // 显示转化

               SetACLTxn setACLTxn = (SetACLTxn) txn;

               // 获取路径

               rc.path = setACLTxn.getPath();

               // 设置ACL

               rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),

                                setACLTxn.getVersion());

               break;

           case OpCode.closeSession: // 关闭会话

               // 关闭会话

               killSession(header.getClientId(), header.getZxid());

               break;

           case OpCode.error: // 错误

               // 显示转化

               ErrorTxn errTxn = (ErrorTxn) txn;

               // 记录错误

               rc.err = errTxn.getErr();

               break;

           case OpCode.check: // 检查

               // 显示转化

               CheckVersionTxn checkTxn = (CheckVersionTxn) txn;

               // 获取路径

               rc.path = checkTxn.getPath();

               break;

           case OpCode.multi: // 多个事务

               // 显示转化

               MultiTxn multiTxn = (MultiTxn) txn ;

               // 获取事务列表

               List<Txn> txns = multiTxn.getTxns();

               rc.multiResult = new ArrayList<ProcessTxnResult>();

               boolean failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表

                   if (subtxn.getType() == OpCode.error) {

                       failed = true;

                       break;

                   }

               }

               boolean post_failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表,确定每个事务类型并进行相应操作

                   // 处理事务的数据

                   ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());

                   Record record = null;

                   switch (subtxn.getType()) {

                       case OpCode.create:

                           record = new CreateTxn();

                           break;

                       case OpCode.delete:

                           record = new DeleteTxn();

                           break;

                       case OpCode.setData:

                           record = new SetDataTxn();

                           break;

                       case OpCode.error:

                           record = new ErrorTxn();

                           post_failed = true;

                           break;

                       case OpCode.check:

                           record = new CheckVersionTxn();

                           break;

                       default:

                           throw new IOException("Invalid type of op: " + subtxn.getType());

                   }

                   assert(record != null);

                   // 将bytebuffer转化为record(初始化record的相关属性)

                   ByteBufferInputStream.byteBuffer2Record(bb, record);


                   if (failed && subtxn.getType() != OpCode.error){ // 失败并且不为error类型

                       int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()

                           : Code.OK.intValue();


                       subtxn.setType(OpCode.error);

                       record = new ErrorTxn(ec);

                   }


                   if (failed) { // 失败

                       assert(subtxn.getType() == OpCode.error) ;

                   }


                   // 生成事务头

                   TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),

                                                    header.getZxid(), header.getTime(),

                                                    subtxn.getType());

                   // 递归调用处理事务

                   ProcessTxnResult subRc = processTxn(subHdr, record);

                   // 保存处理结果

                   rc.multiResult.add(subRc);

                   if (subRc.err != 0 && rc.err == 0) {

                       rc.err = subRc.err ;

                   }

               }

               break;

       }

   } catch (KeeperException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

       rc.err = e.code().intValue();

   } catch (IOException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

   }

   /*

        * A snapshot might be in progress while we are modifying the data

        * tree. If we set lastProcessedZxid prior to making corresponding

        * change to the tree, then the zxid associated with the snapshot

        * file will be ahead of its contents. Thus, while restoring from

        * the snapshot, the restore method will not apply the transaction

        * for zxid associated with the snapshot file, since the restore

        * method assumes that transaction to be present in the snapshot.

        *

        * To avoid this, we first apply the transaction and then modify

        * lastProcessedZxid.  During restore, we correctly handle the

        * case where the snapshot contains data ahead of the zxid associated

        * with the file.

        */

   // 事务处理结果中保存的zxid大于已经被处理的最大的zxid,则重新赋值

   if (rc.zxid > lastProcessedZxid) {

       lastProcessedZxid = rc.zxid;

   }


   /*

        * Snapshots are taken lazily. It can happen that the child

        * znodes of a parent are created after the parent

        * is serialized. Therefore, while replaying logs during restore, a

        * create might fail because the node was already

        * created.

        *

        * After seeing this failure, we should increment

        * the cversion of the parent znode since the parent was serialized

        * before its children.

        *

        * Note, such failures on DT should be seen only during

        * restore.

        */

   if (header.getType() == OpCode.create &&

       rc.err == Code.NODEEXISTS.intValue()) { // 处理在恢复数据过程中的结点创建操作

       LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +

                 " path:" + rc.path + " err: " + rc.err);

       int lastSlash = rc.path.lastIndexOf('/');

       String parentName = rc.path.substring(0, lastSlash);

       CreateTxn cTxn = (CreateTxn)txn;

       try {

           setCversionPzxid(parentName, cTxn.getParentCVersion(),

                            header.getZxid());

       } catch (KeeperException.NoNodeException e) {

           LOG.error("Failed to set parent cversion for: " +

                     parentName, e);

           rc.err = e.code().intValue();

       }

   } else if (rc.err != Code.OK.intValue()) {

       LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +

                 " : error: " + rc.err);

   }

   return rc;

}

说明:processTxn用于处理事务,即将事务操作应用到DataTree【参见下面源码】内存数据库中,以恢复成最新的数据。其操作所有数据节点都是借助于DataNode,数据会类似于树状存储,

以删除为例:

public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {

   int lastSlash = path.lastIndexOf('/');

   String parentName = path.substring(0, lastSlash);

   String childName = path.substring(lastSlash + 1);

   DataNode node = nodes.get(path);

   if (node == null) {

       throw new KeeperException.NoNodeException();

   }

   nodes.remove(path);

   DataNode parent = nodes.get(parentName);

   if (parent == null) {

       throw new KeeperException.NoNodeException();

   }

   synchronized (parent) {

       parent.removeChild(childName);

       parent.stat.setPzxid(zxid);

       long eowner = node.stat.getEphemeralOwner();

       if (eowner != 0) {

           HashSet<String> nodes = ephemerals.get(eowner);

           if (nodes != null) {

               synchronized (nodes) {

                   nodes.remove(path);

               }

           }

       }

       node.parent = null;

   }

   ......

}

2. save函数  

public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)

   throws IOException {

   // 获取最后处理的zxid

   long lastZxid = dataTree.lastProcessedZxid;

   // 生成snapshot文件

   File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));

   LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);

   // 序列化datatree、sessionsWithTimeouts至snapshot文件

   snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);


}

说明:save函数用于将sessions和datatree保存至snapshot文件中,其大致步骤如下

  ① 获取内存数据库中已经处理的最新的zxid,进入②

  ② 根据zxid和快照目录生成snapshot文件,进入③

  ③ 将datatree(内存数据库)、sessionsWithTimeouts序列化至快照文件。

  其他的函数或多或少都是调用TxnLog和SnapLog中的相应函数,之前已经进行过分析,这里不再累赘。

三、总结

本篇博文分析了FileTxnSnapLog的源码,其主要:

  • 封装了TxnLog和SnapLog来进行相应的处理
  • 提供了从snapshot文件和log文件中恢复内存数据库的接口
  • 其对于事务的维护,或说事务的存储结构是一个个DataTree,代码层面看到的是DataNode,其增删都是追加/删除父子节点,类似于树状操作
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
Cloud Native Java API
聊聊从单体到微服务架构服务演化过程
本文介绍了从单体应用到微服务再到云原生架构的演进过程。单体应用虽易于搭建和部署,但难以局部更新;面向服务架构(SOA)通过模块化和服务总线提升了组件复用性和分布式部署能力;微服务则进一步实现了服务的独立开发与部署,提高了灵活性;云原生架构则利用容器化、微服务和自动化工具,实现了应用在动态环境中的弹性扩展与高效管理。这一演进体现了软件架构向着更灵活、更高效的方向发展。
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
15天前
|
Kubernetes 负载均衡 Docker
构建高效后端服务:微服务架构的探索与实践
【10月更文挑战第20天】 在数字化时代,后端服务的构建对于任何在线业务的成功至关重要。本文将深入探讨微服务架构的概念、优势以及如何在实际项目中有效实施。我们将从微服务的基本理念出发,逐步解析其在提高系统可维护性、扩展性和敏捷性方面的作用。通过实际案例分析,揭示微服务架构在不同场景下的应用策略和最佳实践。无论你是后端开发新手还是经验丰富的工程师,本文都将为你提供宝贵的见解和实用的指导。
|
14天前
|
监控 API 持续交付
构建高效后端服务:微服务架构的深度探索
【10月更文挑战第20天】 在数字化时代,后端服务的构建对于支撑复杂的业务逻辑和海量数据处理至关重要。本文深入探讨了微服务架构的核心理念、实施策略以及面临的挑战,旨在为开发者提供一套构建高效、可扩展后端服务的方法论。通过案例分析,揭示微服务如何帮助企业应对快速变化的业务需求,同时保持系统的稳定性和灵活性。
44 9
|
16天前
|
监控 安全 Java
构建高效后端服务:微服务架构深度解析与最佳实践###
【10月更文挑战第19天】 在数字化转型加速的今天,企业对后端服务的响应速度、可扩展性和灵活性提出了更高要求。本文探讨了微服务架构作为解决方案,通过分析传统单体架构面临的挑战,深入剖析微服务的核心优势、关键组件及设计原则。我们将从实际案例入手,揭示成功实施微服务的策略与常见陷阱,为开发者和企业提供可操作的指导建议。本文目的是帮助读者理解如何利用微服务架构提升后端服务的整体效能,实现业务快速迭代与创新。 ###
46 2
|
21天前
|
消息中间件 Kafka 数据库
微服务架构中,如何确保服务之间的数据一致性?
微服务架构中,如何确保服务之间的数据一致性?
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
40 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
43 1
|
18天前
|
运维 Kubernetes 开发者
构建高效后端服务:微服务架构与容器化技术的结合
【10月更文挑战第18天】 在数字化转型的浪潮中,企业对后端服务的要求日益提高,追求更高的效率、更强的可伸缩性和更易于维护的系统。本文将探讨微服务架构与容器化技术如何结合,以构建一个既灵活又高效的后端服务体系。通过分析当前后端服务面临的挑战,介绍微服务和容器化的基本概念,以及它们如何相互配合来优化后端服务的性能和管理。本文旨在为开发者提供一种实现后端服务现代化的方法,从而帮助企业在竞争激烈的市场中脱颖而出。
21 0
|
2月前
|
消息中间件 Kafka 数据库
微服务架构中,如何确保服务之间的数据一致性
微服务架构中,如何确保服务之间的数据一致性

相关产品

  • 微服务引擎
  • 下一篇
    无影云桌面