45-微服务技术栈(高级):分布式协调服务zookeeper源码篇(持久化FileTxnSnapLog)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

一、前言

  前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类。

二、FileTxnSnapLog源码分析

2.1 类的属性

public class FileTxnSnapLog {

   //the direcotry containing the

   //the transaction logs

   // 日志文件目录

   private final File dataDir;

   //the directory containing the

   //the snapshot directory

   // 快照文件目录

   private final File snapDir;

   // 事务日志

   private TxnLog txnLog;

   // 快照

   private SnapShot snapLog;

   // 版本号

   public final static int VERSION = 2;

   // 版本

   public final static String version = "version-";


   // Logger

   private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);

}

说明:类的属性中包含了TxnLog和SnapShot接口,即对FileTxnSnapLog的很多操作都会转发给TxnLog和SnapLog进行操作,这是一种典型的组合方法。

2.2 内部类

  FileTxnSnapLog包含了PlayBackListener内部类,用来接收事务应用过程中的回调,在Zookeeper数据恢复后期,会有事务修正过程,此过程会回调PlayBackListener来进行对应的数据修正。其源码如下 

public interface PlayBackListener {

   void onTxnLoaded(TxnHeader hdr, Record rec);

}

 说明:在完成事务操作后,会调用到onTxnLoaded方法进行相应的处理。

2.3 构造函数

  FileTxnSnapLog的构造函数如下

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {

   LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

   // 在datadir和snapdir下生成version-2目录

   this.dataDir = new File(dataDir, version + VERSION);

   this.snapDir = new File(snapDir, version + VERSION);

   if (!this.dataDir.exists()) { // datadir存在但无法创建目录,则抛出异常

       if (!this.dataDir.mkdirs()) {

           throw new IOException("Unable to create data directory "

                                 + this.dataDir);

       }

   }

   if (!this.snapDir.exists()) { // snapdir存在但无法创建目录,则抛出异常

       if (!this.snapDir.mkdirs()) {

           throw new IOException("Unable to create snap directory "

                                 + this.snapDir);

       }

   }

   // 给属性赋值

   txnLog = new FileTxnLog(this.dataDir);

   snapLog = new FileSnap(this.snapDir);

}

说明:对于构造函数而言,其会在传入的datadir和snapdir目录下新生成version-2的目录,并且会判断目录是否创建成功,之后会创建txnLog和snapLog。

2.4 核心函数分析

1. restore函数 

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {

   // 根据snap文件反序列化dt和sessions

   snapLog.deserialize(dt, sessions);

   //

   FileTxnLog txnLog = new FileTxnLog(dataDir);

   // 获取比最后处理的zxid+1大的log文件的迭代器

   TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);

   // 最大的zxid

   long highestZxid = dt.lastProcessedZxid;


   TxnHeader hdr;

   try {

       while (true) {

           // itr在read函数调用后就已经指向第一个合法的事务

           // 获取事务头

           hdr = itr.getHeader();

           if (hdr == null) { // 事务头为空

               // 表示日志文件为空

               return dt.lastProcessedZxid;

           }

           // 事务头的zxid小于snapshot中的最大zxid并且其不为0,则会报错,因为上面获取的是+1

           if (hdr.getZxid() < highestZxid && highestZxid != 0) {

               LOG.error("{}(higestZxid) > {}(next log) for type {}",

                         new Object[] { highestZxid, hdr.getZxid(),

                                       hdr.getType() });

           } else {

               // 重新赋值highestZxid

               highestZxid = hdr.getZxid();

           }

           try {

               // 在datatree上处理事务

               processTransaction(hdr,dt,sessions, itr.getTxn());

           } catch(KeeperException.NoNodeException e) {

               throw new IOException("Failed to process transaction type: " +

                                     hdr.getType() + " error: " + e.getMessage(), e);

           }

           // 每处理完一个事务都会进行回调

           listener.onTxnLoaded(hdr, itr.getTxn());

           if (!itr.next()) // 已无事务,跳出循环

               break;

       }

   } finally {

       if (itr != null) { // 迭代器不为空,则关闭

           itr.close();

       }

   }

   // 返回最高的zxid

   return highestZxid;

}

说明:restore用于恢复datatree和sessions,其步骤大致如下

  ① 根据snapshot文件反序列化datatree和sessions,进入②

  ② 获取比snapshot文件中的zxid+1大的log文件的迭代器,以对log文件中的事务进行迭代,进入③

  ③ 迭代log文件的每个事务,并且将该事务应用在datatree中,同时会调用onTxnLoaded函数进行后续处理,进入④

  ④ 关闭迭代器,返回log文件中最后一个事务的zxid(作为最高的zxid)

其中会调用到FileTxnLog的read函数,read函数在FileTxnLog中已经进行过分析,会调用processTransaction函数,其源码如下 

public void processTransaction(TxnHeader hdr,DataTree dt,

                              Map<Long, Integer> sessions, Record txn)

   throws KeeperException.NoNodeException {

   // 事务处理结果

   ProcessTxnResult rc;

   switch (hdr.getType()) { // 确定事务类型

       case OpCode.createSession: // 创建会话

           // 添加进会话

           sessions.put(hdr.getClientId(),

                        ((CreateSessionTxn) txn).getTimeOut());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- create session in log: 0x"

                                        + Long.toHexString(hdr.getClientId())

                                        + " with timeout: "

                                        + ((CreateSessionTxn) txn).getTimeOut());

           }

           // give dataTree a chance to sync its lastProcessedZxid

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       case OpCode.closeSession: // 关闭会话

           // 会话中移除

           sessions.remove(hdr.getClientId());

           if (LOG.isTraceEnabled()) {

               ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,

                                        "playLog --- close session in log: 0x"

                                        + Long.toHexString(hdr.getClientId()));

           }

           // 处理事务

           rc = dt.processTxn(hdr, txn);

           break;

       default:

           // 处理事务

           rc = dt.processTxn(hdr, txn);

   }


   /**

        * Snapshots are lazily created. So when a snapshot is in progress,

        * there is a chance for later transactions to make into the

        * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS

        * errors could occur. It should be safe to ignore these.

        */

   if (rc.err != Code.OK.intValue()) { // 忽略处理结果中可能出现的错误

       LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()

                 + ", error: " + rc.err + ", path: " + rc.path);

   }

}

说明:processTransaction会根据事务头中记录的事务类型(createSession、closeSession、其他类型)来进行相应的操作,对于createSession类型而言,其会将会话和超时时间添加至会话map中,对于closeSession而言,会话map会根据客户端的id号删除其会话(因为存储时候key=clientId),同时,所有的操作都会调用到dt.processTxn函数,其源码如下 

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

{

   // 事务处理结果

   ProcessTxnResult rc = new ProcessTxnResult();

   try {

       // 从事务头中解析出相应属性并保存至rc中

       rc.clientId = header.getClientId();

       rc.cxid = header.getCxid();

       rc.zxid = header.getZxid();

       rc.type = header.getType();

       rc.err = 0;

       rc.multiResult = null;

       switch (header.getType()) { // 确定事务类型

           case OpCode.create: // 创建结点

               // 显示转化

               CreateTxn createTxn = (CreateTxn) txn;

               // 获取创建结点路径

               rc.path = createTxn.getPath();

               // 创建结点

               createNode(

                   createTxn.getPath(),

                   createTxn.getData(),

                   createTxn.getAcl(),

                   createTxn.getEphemeral() ? header.getClientId() : 0,

                   createTxn.getParentCVersion(),

                   header.getZxid(), header.getTime());

               break;

           case OpCode.delete: // 删除结点

               // 显示转化

               DeleteTxn deleteTxn = (DeleteTxn) txn;

               // 获取删除结点路径

               rc.path = deleteTxn.getPath();

               // 删除结点

               deleteNode(deleteTxn.getPath(), header.getZxid());

               break;

           case OpCode.setData: // 写入数据

               // 显示转化

               SetDataTxn setDataTxn = (SetDataTxn) txn;

               // 获取写入数据结点路径

               rc.path = setDataTxn.getPath();

               // 写入数据

               rc.stat = setData(setDataTxn.getPath(), setDataTxn

                                 .getData(), setDataTxn.getVersion(), header

                                 .getZxid(), header.getTime());

               break;

           case OpCode.setACL: // 设置ACL

               // 显示转化

               SetACLTxn setACLTxn = (SetACLTxn) txn;

               // 获取路径

               rc.path = setACLTxn.getPath();

               // 设置ACL

               rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),

                                setACLTxn.getVersion());

               break;

           case OpCode.closeSession: // 关闭会话

               // 关闭会话

               killSession(header.getClientId(), header.getZxid());

               break;

           case OpCode.error: // 错误

               // 显示转化

               ErrorTxn errTxn = (ErrorTxn) txn;

               // 记录错误

               rc.err = errTxn.getErr();

               break;

           case OpCode.check: // 检查

               // 显示转化

               CheckVersionTxn checkTxn = (CheckVersionTxn) txn;

               // 获取路径

               rc.path = checkTxn.getPath();

               break;

           case OpCode.multi: // 多个事务

               // 显示转化

               MultiTxn multiTxn = (MultiTxn) txn ;

               // 获取事务列表

               List<Txn> txns = multiTxn.getTxns();

               rc.multiResult = new ArrayList<ProcessTxnResult>();

               boolean failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表

                   if (subtxn.getType() == OpCode.error) {

                       failed = true;

                       break;

                   }

               }

               boolean post_failed = false;

               for (Txn subtxn : txns) { // 遍历事务列表,确定每个事务类型并进行相应操作

                   // 处理事务的数据

                   ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());

                   Record record = null;

                   switch (subtxn.getType()) {

                       case OpCode.create:

                           record = new CreateTxn();

                           break;

                       case OpCode.delete:

                           record = new DeleteTxn();

                           break;

                       case OpCode.setData:

                           record = new SetDataTxn();

                           break;

                       case OpCode.error:

                           record = new ErrorTxn();

                           post_failed = true;

                           break;

                       case OpCode.check:

                           record = new CheckVersionTxn();

                           break;

                       default:

                           throw new IOException("Invalid type of op: " + subtxn.getType());

                   }

                   assert(record != null);

                   // 将bytebuffer转化为record(初始化record的相关属性)

                   ByteBufferInputStream.byteBuffer2Record(bb, record);


                   if (failed && subtxn.getType() != OpCode.error){ // 失败并且不为error类型

                       int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()

                           : Code.OK.intValue();


                       subtxn.setType(OpCode.error);

                       record = new ErrorTxn(ec);

                   }


                   if (failed) { // 失败

                       assert(subtxn.getType() == OpCode.error) ;

                   }


                   // 生成事务头

                   TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),

                                                    header.getZxid(), header.getTime(),

                                                    subtxn.getType());

                   // 递归调用处理事务

                   ProcessTxnResult subRc = processTxn(subHdr, record);

                   // 保存处理结果

                   rc.multiResult.add(subRc);

                   if (subRc.err != 0 && rc.err == 0) {

                       rc.err = subRc.err ;

                   }

               }

               break;

       }

   } catch (KeeperException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

       rc.err = e.code().intValue();

   } catch (IOException e) {

       if (LOG.isDebugEnabled()) {

           LOG.debug("Failed: " + header + ":" + txn, e);

       }

   }

   /*

        * A snapshot might be in progress while we are modifying the data

        * tree. If we set lastProcessedZxid prior to making corresponding

        * change to the tree, then the zxid associated with the snapshot

        * file will be ahead of its contents. Thus, while restoring from

        * the snapshot, the restore method will not apply the transaction

        * for zxid associated with the snapshot file, since the restore

        * method assumes that transaction to be present in the snapshot.

        *

        * To avoid this, we first apply the transaction and then modify

        * lastProcessedZxid.  During restore, we correctly handle the

        * case where the snapshot contains data ahead of the zxid associated

        * with the file.

        */

   // 事务处理结果中保存的zxid大于已经被处理的最大的zxid,则重新赋值

   if (rc.zxid > lastProcessedZxid) {

       lastProcessedZxid = rc.zxid;

   }


   /*

        * Snapshots are taken lazily. It can happen that the child

        * znodes of a parent are created after the parent

        * is serialized. Therefore, while replaying logs during restore, a

        * create might fail because the node was already

        * created.

        *

        * After seeing this failure, we should increment

        * the cversion of the parent znode since the parent was serialized

        * before its children.

        *

        * Note, such failures on DT should be seen only during

        * restore.

        */

   if (header.getType() == OpCode.create &&

       rc.err == Code.NODEEXISTS.intValue()) { // 处理在恢复数据过程中的结点创建操作

       LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +

                 " path:" + rc.path + " err: " + rc.err);

       int lastSlash = rc.path.lastIndexOf('/');

       String parentName = rc.path.substring(0, lastSlash);

       CreateTxn cTxn = (CreateTxn)txn;

       try {

           setCversionPzxid(parentName, cTxn.getParentCVersion(),

                            header.getZxid());

       } catch (KeeperException.NoNodeException e) {

           LOG.error("Failed to set parent cversion for: " +

                     parentName, e);

           rc.err = e.code().intValue();

       }

   } else if (rc.err != Code.OK.intValue()) {

       LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +

                 " : error: " + rc.err);

   }

   return rc;

}

说明:processTxn用于处理事务,即将事务操作应用到DataTree【参见下面源码】内存数据库中,以恢复成最新的数据。其操作所有数据节点都是借助于DataNode,数据会类似于树状存储,

以删除为例:

public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException {

   int lastSlash = path.lastIndexOf('/');

   String parentName = path.substring(0, lastSlash);

   String childName = path.substring(lastSlash + 1);

   DataNode node = nodes.get(path);

   if (node == null) {

       throw new KeeperException.NoNodeException();

   }

   nodes.remove(path);

   DataNode parent = nodes.get(parentName);

   if (parent == null) {

       throw new KeeperException.NoNodeException();

   }

   synchronized (parent) {

       parent.removeChild(childName);

       parent.stat.setPzxid(zxid);

       long eowner = node.stat.getEphemeralOwner();

       if (eowner != 0) {

           HashSet<String> nodes = ephemerals.get(eowner);

           if (nodes != null) {

               synchronized (nodes) {

                   nodes.remove(path);

               }

           }

       }

       node.parent = null;

   }

   ......

}

2. save函数  

public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)

   throws IOException {

   // 获取最后处理的zxid

   long lastZxid = dataTree.lastProcessedZxid;

   // 生成snapshot文件

   File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));

   LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);

   // 序列化datatree、sessionsWithTimeouts至snapshot文件

   snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);


}

说明:save函数用于将sessions和datatree保存至snapshot文件中,其大致步骤如下

  ① 获取内存数据库中已经处理的最新的zxid,进入②

  ② 根据zxid和快照目录生成snapshot文件,进入③

  ③ 将datatree(内存数据库)、sessionsWithTimeouts序列化至快照文件。

  其他的函数或多或少都是调用TxnLog和SnapLog中的相应函数,之前已经进行过分析,这里不再累赘。

三、总结

本篇博文分析了FileTxnSnapLog的源码,其主要:

  • 封装了TxnLog和SnapLog来进行相应的处理
  • 提供了从snapshot文件和log文件中恢复内存数据库的接口
  • 其对于事务的维护,或说事务的存储结构是一个个DataTree,代码层面看到的是DataNode,其增删都是追加/删除父子节点,类似于树状操作
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
131 2
|
2天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
4天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
4天前
使用JWT的服务分布式部署之后报错:JWT Check Failure:
使用JWT的服务分布式部署之后报错:JWT Check Failure:
54 1
|
4天前
|
存储 缓存 NoSQL
缓存、分布式缓存和持久化
这篇内容介绍了缓存的概念和Redis的作用,以口袋与公文包的比喻解释了缓存如何提高数据访问速度。Redis是一个内存中的高级缓存系统,能提升系统响应速度。接着讨论了为何需要分布式缓存,通过多个“篮子”(Redis节点)解决单点故障和性能瓶颈,保证高可用性和数据安全性。最后提到了Redis的两种持久化机制——RDB(定期数据快照)和AOF(记录写操作日志),分别用照片备份和实时同步来比喻,说明它们在数据丢失风险和恢复速度上的权衡。
|
1天前
|
运维 监控 Docker
使用Docker进行微服务架构的部署
【5月更文挑战第18天】本文探讨了如何使用Docker进行微服务架构部署,介绍了Docker的基本概念,如容器化平台和核心组件,以及它与微服务的关系。通过Docker,每个微服务可独立运行在容器中,便于构建、测试和部署。文章详细阐述了使用Docker部署微服务的步骤,包括定义服务、编写Dockerfile、构建镜像、运行容器、配置服务通信、监控和日志管理以及扩展和更新。Docker为微服务提供了可移植、可扩展的解决方案,是现代微服务架构的理想选择。
|
1天前
|
敏捷开发 监控 API
构建高效微服务架构:从理论到实践
【5月更文挑战第18天】 在当今快速发展的软件开发领域,微服务架构已经成为一种流行的设计模式,它通过将大型应用程序分解为一系列小型、独立的服务来提高系统的可伸缩性、弹性和维护性。本文旨在探讨如何从理论走向实践,构建一个高效的微服务架构。文章首先介绍微服务的基本概念和优势,然后详细讨论了在设计和部署微服务时需要考虑的关键因素,包括服务划分、通信机制、数据一致性、容错处理和监控策略。最后,结合具体案例分析,展示如何在现实世界中应用这些原则,确保微服务架构的高效运行。
|
1天前
|
存储 弹性计算 运维
探索微服务架构下的服务治理
【5月更文挑战第18天】 在当今软件工程领域,微服务架构因其灵活性、可扩展性以及促进团队协作等优势而受到广泛青睐。然而,随着系统规模的增长和服务数量的膨胀,服务治理成为确保系统稳定性和高效性的关键因素。本文将深入探讨微服务环境下的服务治理实践,包括服务发现、配置管理、负载均衡、故障处理等关键方面,旨在为开发者提供一套行之有效的服务治理策略。
|
1天前
|
监控 持续交付 开发者
构建高效微服务架构:后端开发的新范式
【5月更文挑战第18天】 随着现代软件开发的复杂性日益增长,传统的单体应用架构已难以满足快速迭代和灵活部署的需求。本文聚焦于一种新兴的解决方案——微服务架构,探讨其如何为后端开发带来革命性的改变。我们将深入分析微服务的核心概念、优势与挑战,并通过具体案例来阐述如何在实际项目中实施微服务架构。文章旨在为开发者提供一种系统化的方法,帮助他们理解并应用微服务架构,以提升系统的可维护性、扩展性和技术敏捷性。
9 2
|
1天前
|
测试技术 持续交付 API
构建高效的微服务架构:后端开发的现代实践
【5月更文挑战第18天】在数字化转型的浪潮中,微服务架构已成为企业追求敏捷、可扩展和容错能力的关键解决方案。本文将深入探讨微服务的核心概念,包括其设计原则、技术栈选择以及实施过程中的挑战与对策。通过对微服务架构实践的详细剖析,旨在为后端开发人员提供一套构建和维护高效微服务系统的实用指南。

相关产品

  • 微服务引擎