深入理解 ZK集群如何保证数据一致性(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 深入理解 ZK集群如何保证数据一致性(二)

源码入口#


单机版本还是集群版本的启动流程中,前部分几乎是相同的,一直到QuorumPeerMain.javainitializeAndRun()方法,单机模式下运行的是ZooKeeperServerMain.main(args);, 集群模式下,运行的是runFromConfig(config);

因此当前博客从QuorumPeerMainrunFromConfig()开始

其中的QuorumPeer.java可以看成ZK集群中的每一个server实体,下面代码大部分篇幅是在当前server的属性完成初始化


// todo 集群启动的逻辑
    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      } 
      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
          // todo new QuorumPeer()  可以理解成, 创建了集群中的一个server
          quorumPeer = getQuorumPeer();
          // todo 将配置文件中解析出来的文件原封不动的赋值给我们的new 的QuorumPeer
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();
          // todo 着重看这个方法
          quorumPeer.start();
          quorumPeer.join();


跟进quorumPeer.start()方法,源码如下, 主要做了如下几件事

  • 数据恢复
  • 经过上下文的工厂,启动这个线程类,使当前的server拥有接受client请求的能力(但是RequestProcessor没有初始化,因此它能接受request,却不能处理request)
  • 选举Leader,在这个过程中会在Follower中选举出一个leader,确立好集群中的 Leader,Follower,Observer的三大角色
  • 启动当前线程类QuorumPeer.java


@Override
public synchronized void start() {
    // todo 从磁盘中加载数据到内存中
    loadDataBase();
    // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求
    cnxnFactory.start();
    // todo 开启leader的选举工作
    startLeaderElection();
    // todo 确定服务器的角色, 启动的就是当前类的run方法在900行
    super.start();
}


看一下QuorumPeer.java的run方法,部分源码如下,逻辑很清楚通过了上面的角色的选举之后,集群中各个节点的角色已经确定下来了,那拥有不同角色的节点就会进入下面代码中不同的case分支中


  • looking : 正在进行领导者的选举
  • observer: 观察者
  • leading : 集群的leader
  • following: 集群的Follower


while (running) {
                switch (getPeerState()) {
        case LOOKING:
            LOG.info("LOOKING");
            if (Boolean.getBoolean("readonlymode.enabled")) {
                LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                // Create read-only server but don't start it immediately
                final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                        logFactory, this,
                        new ZooKeeperServer.BasicDataTreeBuilder(),
                        this.zkDb);
                Thread roZkMgr = new Thread() {
                    public void run() {
                        try {
                            // lower-bound grace period to 2 secs
                            sleep(Math.max(2000, tickTime));
                            if (ServerState.LOOKING.equals(getPeerState())) {
                                roZk.startup();
                            }
                        } catch (InterruptedException e) {
                            LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                        } catch (Exception e) {
                            LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                        }
                    }
                };
                try {
                    roZkMgr.start();
                    setBCVote(null);
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                    setPeerState(ServerState.LOOKING);
                } finally {
                    roZkMgr.interrupt();
                    roZk.shutdown();
                }
            } else {
                try {
                    setBCVote(null);
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                }
            }
            break;
        case OBSERVING:
            try {
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e );                        
            } finally {
                observer.shutdown();
                setObserver(null);
                setPeerState(ServerState.LOOKING);
            }
            break;
        case FOLLOWING:
            // todo server 当选follow角色
            try {
                LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                follower.shutdown();
                setFollower(null);
                setPeerState(ServerState.LOOKING);
            }
            break;
        case LEADING:
            // todo 服务器成功当选成leader
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                // todo 跟进lead
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                setPeerState(ServerState.LOOKING);
            }
            break;
        }
    }


下面看一下,server当选成不同的角色后,后干了什么


总览Leader&Follower#


当选成Leader#


跟进源码,上面代码片段中makeLeader() 由这个方法创建了一个Leader的封装类


protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        // todo 跟进它的Leader 构造方法
        return new Leader(this, new LeaderZooKeeperServer(logFactory,
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }



这是LeaderZooKeeperServer的继承图,可以看到其实他继承了单机模式下的ZKServer


调用leader.lead()方法,这个方法主要做了如下几件事

  • 创建了StateSummary对象
  • 这个对象封装了zxid以及currentEpoch, 其中zxid就是最后一次和znode相关的事务id,后者是当前的epoch 它有64位,高32位标记是第几代Leader,后32位是当前代leader提交的事务次数,Follower只识别高版本的前32位为Leader
  • 针对每一个Learner都开启了一条新的线程LearnerCnxAcceptor,这条线程负责Leader和Learner(Observer+Follower)之间的IO交流
  • LearnerCnxAcceptorrun()方法中,只要有新的连接来了,新开启了一条新的线程,LearnerHander,由他负责Leader中接受每一个参议员的packet,以及监听新连接的到来
  • leader启动...


void lead() throws IOException, InterruptedException {
        zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
        try {
            self.tick.set(0);
            zk.loadData();
            // todo  创建了  封装有状态比较逻辑的对象
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
            // todo 创建一个新的线程,为了新的 followers 来连接
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            readyToStart = true;
            // todo
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
            synchronized(this){
                lastProposed = zk.getZxid();
            }
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);
            if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                LOG.info("NEWLEADER proposal has Zxid of "
                        + Long.toHexString(newLeaderProposal.packet.getZxid()));
            }
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);
            try {
                waitForNewLeaderAck(self.getId(), zk.getZxid());
            } catch (InterruptedException e) {
                shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                        + getSidSetString(newLeaderProposal.ackSet) + " ]");
                HashSet<Long> followerSet = new HashSet<Long>();
                for (LearnerHandler f : learners)
                    followerSet.add(f.getSid());
                if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                    LOG.warn("Enough followers present. "
                            + "Perhaps the initTicks need to be increased.");
                }
                Thread.sleep(self.tickTime);
                self.tick.incrementAndGet();
                return;
            }
            // todo 启动server
            startZkServer();
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }
            boolean tickSkip = true;
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }
                HashSet<Long> syncedSet = new HashSet<Long>();
                // lock on the followers when we use it.
                syncedSet.add(self.getId());
                for (LearnerHandler f : getLearners()) {
                    // Synced set is used to check we have a supporting quorum, so only
                    // PARTICIPANT, not OBSERVER, learners should be used
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }
                // check leader running status
                if (!this.isRunning()) {
                    shutdown("Unexpected internal error");
                    return;
                }
              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                    // Lost quorum, shutdown
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    // make sure the order is the same!
                    // the leader goes to looking
                    return;
              } 
              tickSkip = !tickSkip;
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }


当选成Follower#


通过上面的case分支进入FOLLOWING块,进入followerLeader方法

下面的Follower.java中的代码的主要逻辑:

  • 和Leader建立起连接
  • registerWithLeader()注册进Leader
  • syncWithLeader()从Leader中同步数据并完成启动
  • while(true){...}中接受leader发送过来的packet,处理packet


void followLeader() throws InterruptedException {
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
        // todo 找出LeaderServer
        QuorumServer leaderServer = findLeader();            
        try {
            // todo 和Leader建立连接
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            // todo 注册在leader上(会往leader上发送数据)
            //todo 这个Epoch代表当前是第几轮选举leader, 这个值给leader使用,由leader从接收到的最大的epoch中选出最大的,然后统一所有learner中的epoch值
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            // todo 从leader同步数据, 同时也是在这个方法中完成初始化启动的
            syncWithLeader(newEpochZxid);
            QuorumPacket qp = new QuorumPacket();
            // todo 在follower中开启无线循环, 不停的接收服务端的pakcet,然后处理packet
            while (this.isRunning()) {
                readPacket(qp);
                // todo (接受leader发送的提议)
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX((Learner)this);
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
消息中间件 分布式计算 算法
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
65 5
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
|
5月前
|
存储 关系型数据库 MySQL
MySQL主从同步如何保证数据一致性?
MySQL主从同步如何保证数据一致性?
407 0
MySQL主从同步如何保证数据一致性?
|
6月前
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
133 2
|
6月前
|
消息中间件 Kafka 索引
微服务数据问题之Broker宕机MetaQ保证数据的可靠性如何解决
微服务数据问题之Broker宕机MetaQ保证数据的可靠性如何解决
|
6月前
|
消息中间件 存储 负载均衡
Kafka高可用性指南:提高数据一致性和集群容错能力!
**Kafka高可用性概览** - 创建Topic时设置`--replication-factor 3`确保数据冗余和高可用。 - 分配角色:Leader处理读写,Follower同步数据,简化管理和客户端逻辑。 - ISR(In-Sync Replicas)保持与Leader同步的副本列表,确保数据一致性和可靠性。 - 设置`acks=all`保证消息被所有副本确认,防止数据丢失,增强一致性。 - 通过这些机制,Kafka实现了分布式环境中的数据可靠性、一致性及服务的高可用性。
848 0
|
6月前
|
消息中间件 存储 Kafka
深入Kafka:如何保证数据一致性与可靠性?
**Kafka一致性详解:** 讲解了幂等性如何通过ProducerID和SequenceNumber确保消息唯一,防止重复处理,维持数据一致性。Kafka利用Zookeeper进行控制器和分区Leader选举,应对节点变动,防止脑裂,确保高可用性。实例中,电商平台用Kafka处理订单,保证每个订单仅处理一次,即使在异常情况下。关注微信公众号“软件求生”获取更多技术内容。
938 0
|
8月前
|
存储 负载均衡 容灾
架构设计|基于 raft-listener 实现实时同步的主备集群
本文介绍如何从数据库内核角度建立一套实时同步的主备集群,确保线上业务的高可用性和可靠性。本系统采用双 AZ 主备容灾机制,并要求数据与 schema 实时同步,同步时延平均在 1 秒内,p99 在 2 秒内。此外,系统支持高效的自动或手动主备切换,并能在切换过程中恢复丢失数据。
127 0
|
8月前
|
网络协议 中间件 数据库
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
295 0
|
存储 缓存 算法
深入理解 ZK集群如何保证数据一致性(一)
深入理解 ZK集群如何保证数据一致性(一)
626 0
|
算法 Java 大数据
ZooKeeper 如何保证数据的一致性?【重要】
ZooKeeper 如何保证数据的一致性?【重要】
179 0

热门文章

最新文章

下一篇
开通oss服务