zookeeper的ZAB协议的原理以及底层源码实现超级详解 1

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: zookeeper的ZAB协议的原理以及底层源码实现超级详解

一,zookeeper的ZAB协议

1,ZAB概述

ZAB:zookeeper atomic broadcast(zookeeper原子广播协议)

ZAB协议主要包括这个 原子广播 和 崩溃恢复


原子广播

就是说集群的主结点leader用来写,其他follow从结点只用来读。在主结点写完会将数据同步到从结点,只要写入成功的从结点的数量超过一半,那么这个数据就同步成功。这个主结点同步到从结点可能会有一定的延迟,因此这个zookeeper主要是为了保证这个数据的最终一致性,也可以叫为顺序一致性。


崩溃恢复

如果在主结点刚把数据写完,这个主结点挂了,那么这个集群就会重新选举新的leader。选leader的规则就是先比较zxid事务id,再比较这个机器对应的myid,谁大谁被选为leader。


二,ZAB协议流程的源码实现

需要下载zookeeper源码,可以参考上一篇:https://blog.csdn.net/zhenghuishengq/article/details/126673923?spm=1001.2014.3001.5502


1,客户端建立连接

1,先创建一个zookeeper对象

ZooKeeper zooKeeper=new ZooKeeper(...);

这个zookeeper的构造方法可能如下,里面会有很多参数,里面主要是会对创建一个connection的连接。

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly,
        HostProvider hostProvider,
        ZKClientConfig clientConfig) throws IOException{
    //增加一个监听机制
    validateWatcher(watcher);
    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    this.hostProvider = hostProvider;
    //建立一个ClientCnxn连接,会和这个服务端建立连接
    cnxn = new ClientCnxn(
      connectStringParser.getChrootPath(),
      hostProvider,
      sessionTimeout,
      this.clientConfig,
      watcher,
      getClientCnxnSocket(),
      sessionId,
      sessionPasswd,
      canBeReadOnly);
    //开始连接
    cnxn.start();
}

2,然后进入这个开始连接的start方法,这个方法里面主要是会去开启两个线程,一个线程主要用来连接服务端,一个线程主要用来响应连接后的事件

public void start() {
    sendThread.start();
    eventThread.start();
}

开启这个了线程之后,主要是查看这个线程的run方法。

接下来看第一个 sendThread 线程的run方法,主要如下,用于连接这个服务端,主要是基于nio和netty两种方式实现这个连接。在连接成功之后,会通过这个nio轮询的方式监听里面的读写事件的发生,并对这个事件进行处理

@Override
public void run() {
    while (state.isAlive()) {
    try {
            //如果没有建立连接
          if (!clientCnxnSocket.isConnected()) {
                onConnecting(serverAddress);
                //建立连接
                startConnect(serverAddress);
            }
            //在建立连接之后,会通过这个nio监听这个读写事件并处理
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        }
    }
}
private void startConnect(InetSocketAddress addr) throws IOException {
    //基于这个nio或者netty两种方式实现,连接这个客户端
    clientCnxnSocket.connect(addr);
}

接下来看第二个线程 eventThread 的run方法,主要是调用一个watcher的一个监听机制

@Override
public void run() {
    while (true) {
        //从阻塞队列里面获取一个事件对象
      Object event = waitingEvents.take();
        //处理这个事件
        processEvent(event);
    }
}
private void processEvent(Object event) {
    WatcherSetEventPair pair = (WatcherSetEventPair) event;
    //Watcher监听机制,用于事件回调
    watcher.process(pair.event);
}

2,客户端写数据

3,建立连接成功之后,就可以开始写数据的操作,主要是通过这个create的这个方法实现

zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

这个create的方法具体如下,就是会将这个发送的数据以及存放的路径做一个封装

public String create(
        final String path,
        byte[] data,
        List<ACL> acl,
        CreateMode createMode) throws KeeperException, InterruptedException {
    //会将传进来的数据和路径封装到一个request里面
  request.setData(data);
    request.setFlags(createMode.toFlag());
    request.setPath(serverPath);
    //通过这个连接对象实现这个客户端向服务端发送数据
    ReplyHeader r = cnxn.submitRequest(h, request, response, null);
}

发送数据的具体实现如下,会对这个传来的数据进行一个打包的操作,在数据打包之后,会将这个数据存放到一个阻塞队列里面

public ReplyHeader submitRequest(...){
    //对这个传过来的数据进行一个打包操作
    Packet packet = queuePacket(h,r,request,response,null,null,null,null,
                                watchRegistration,watchDeregistration);
    waitForPacketFinish(r, packet);
}
public Packet queuePacket(){ 
    synhronized (outgoingQueue) {
      if (!state.isAlive() || closing) {
          conLossPacket(packet);
      } else {
          if (h.getType() == OpCode.closeSession) {
              closing = true;
          }
            //将打包的数据存放到一个阻塞队列里面
          outgoingQueue.add(packet);
        }
        //唤醒被阻塞的selector,然后向这个管道写入一个数据,
        //这样就可以触发前面的sendThread线程,并触发里面的写事件,将数据写入到服务端
        sendThread.getClientCnxnSocket().packetAdded();
    }
}

最终会通过这个Socket的write写入事件,将这个序列化后的数据存放到buffer里面,然后通过这个SocketChannel的write方法将数据写入到服务端。数据主要是通过这个outgoingQueue队列,以异步的方式将这个信息发送到服务端。


3,服务端接收数据

4,服务端这边主要是在这个 ServerCnxnFactory 类下面的 createFactory 方法里面来构建与客户端的连接,这个服务端接收数据主要是通过主结点和这个客户端进行一个交互

public abstract class ServerCnxnFactory {
    static public ServerCnxnFactory createFactory() throws IOException {
        //这里主要是选择nio的方式或者选择netty的方式建立一个连接
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            //然后通过这个反射的方式找到对应的server,如使用的netty连接就会找nettyServer
            ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                    .getDeclaredConstructor().newInstance();
            return serverCnxnFactory;
        } 
    }
}

5,接下来主要查看这个 NettyServerCnxnFactory 类的这个构造方法,底层主要是一些netty的一些实现逻辑。主要用来实现数据的传输

NettyServerCnxnFactory() {
    EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(
        NettyUtils.getClientReachableLocalInetAddressCount());
        EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap()
          .group(bossGroup, workerGroup)
          .channel(NettyUtils.nioOrEpollServerSocketChannel())
          // parent channel options
          .option(ChannelOption.SO_REUSEADDR, true)
          // child channels options
          .childOption(ChannelOption.TCP_NODELAY, true)
          .childOption(ChannelOption.SO_LINGER, -1)
          .childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  ChannelPipeline pipeline = ch.pipeline();
                  if (secure) {
                      initSSL(pipeline, false);
                  } else if (shouldUsePortUnification) {
                      initSSL(pipeline, true);
                  }
                  pipeline.addLast("servercnxnfactory", channelHandler);
              }
          });
   this.bootstrap = configureBootstrapAllocator(bootstrap);
   this.bootstrap.validate();
}

6,在建立好连接之后,服务端这边会通过这个channelRead 这个方法来读取通道的信息,就是客户端发过来的信息。

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     try {
         if (LOG.isTraceEnabled()) {
             LOG.trace("message received called {}", msg);
         }
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("New message {} from {}", msg, ctx.channel());
             }
             //服务端的连接对象
             NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
            if (cnxn == null) {
            } else {
                //处理客户端传过来的数据
                cnxn.processMessage((ByteBuf) msg);
            }
        } catch (Exception ex) {
            LOG.error("Unexpected exception in receive", ex);
            throw ex;
        }
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
//处理这个客户端传过来的数据
void processMessage(ByteBuf buf){
    receiveMessage(buf);
}

7,接下来就是正式的接收这个发送过来的数据,并对这个数据进行处理

private void receiveMessage(ByteBuf message) {
    //读取message,将数据读取到服务端的byteBuffer里面
    message.readBytes(bb);
    //处理这个打包好的数据
    zks.processPacket(this, bb);
}

接下来进入这个processPacket 这个方法,主要是用来解析打包的数据。并且在服务端中,又会将这个数据进行一个打包,封装到一个request的一个阻塞队列里面,最后将这个打包的数据进行提交

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    //二进制流接收数据
  InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    //将传送过来的序列化的数据进行一个反序列化
    RequestHeader h = new RequestHeader();
    h.deserialize(bia, "header");
    //最后将客户端传过来的数据又封装到一个request的对象里面
    Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
    h.getType(), incomingBuffer, cnxn.getAuthInfo());
    si.setOwner(ServerCnxn.me);
    setLocalSessionFlag(si);
    //提交这个数据
    submitRequest(si);
}

4,服务端主结点处理数据

8,就是进入上面的这个 submitRequest 的提交数据的这个方法,里面主要是通过一个processRequest的这个方法来进行处理这些数据

public void submitRequest(Request si) {
    firstProcessor.processRequest(si);
}

9,这个firstProcessor的处理器主要是在 LeaderZooKeeperServer 类下面的这个 setupRequestProcessors 的这个方法初始化的。服务端这边主要会初始化这个Processor的一个链条,底层主要是通过这个责任链的方式实现。责任链主要是为了分工合作,模块解耦

@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    //将上一个processor放入下一个processor,开始构建一个责任链的一个链条
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
  commitProcessor = new CommitProcessor(toBeAppliedProcessor,
          Long.toString(getServerId()), false,
          getZooKeeperServerListener());
  commitProcessor.start();
  ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
          commitProcessor);
  proposalProcessor.initialize();
  prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
  //获取队列的数据,并对数据进行读取
  prepRequestProcessor.start();
  firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
  setupContainerManager();
}

这个链条如下


040afec1799b4c598554a5a49e41fcc0.png

10,在执行这个链条的过程中,会通过这个prepRequestProcessor这个线程来读取加在服务端队列的里面的消息,接下来主要就是查看这个线程里面的run方法。这个结点其主要是为了填充这个zxid,就是事务id

@Override
public void run() {
    try{
        Request request = submittedRequests.take();
        //通过这个方法进行最终的处理
        pRequest(request);
    }
}

在这个pRequest方法里面,会比较之前客户端传过来的命令,比如说create,delete命令等,那么服务端就会执行具体的操作。

protected void pRequest(Request request) throws RequestProcessorException {
    try {
      switch (request.type) {
      case OpCode.createContainer:
      case OpCode.create:
      case OpCode.create2:
          CreateRequest create2Request = new CreateRequest();
            //创建命令的具体逻辑
          pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
          break;
      case OpCode.createTTL:
          CreateTTLRequest createTtlRequest = new CreateTTLRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
                break;
        case OpCode.deleteContainer:
        case OpCode.delete:
            DeleteRequest deleteRequest = new DeleteRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
              break;
        case OpCode.setData:
            SetDataRequest setDataRequest = new SetDataRequest();                
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
        ...
}

接下来就是主要查看一下这个创建命令,主要是通过这个 pRequest2Txn 方法实现,并且里面的这个参数zxid,是一个automic的一个原子类型,不存在这个线程安全问题。获取的命令越新,那么这个zxid的值就越大。 每从这个内存队列里面获取一条消息,那么这个zxid的值就会加1

protected void pRequest2Txn(int type, long zxid, Request request,
                            Record record, boolean deserialize)
    throws KeeperException, IOException, RequestProcessorException{
    //将这个zxid填充回request
    request.zxid = zks.getZxid();
    //由下一个process处理
    nextProcessor.processRequest(request);
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
28 1
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
39 1
|
2月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议
|
3月前
|
消息中间件 分布式计算 安全
这一次,彻底弄懂ZooKeeper协议
ZooKeeper是动物园的意思,在2012年官方来给ZooKeeper写了这么一段有趣的“ZooKeeper之道”,难怪ZooKeeper现在发展得这么好。动物园管理员对他们负责的动物和参观动物的游客都尽心尽力。他们遵循一套守则,至今只有同行才知道,这套守则可以保证动物和游客的安全。大家好,我是南哥。一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
134 4
这一次,彻底弄懂ZooKeeper协议
|
4月前
|
Nacos 微服务
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
90 4
|
6月前
|
消息中间件
【ZooKeeper系列】那ZooKeeper为什么还采用ZAB协议
ZooKeeper的流程是这样的,针对客户端的事务请求,Leader服务器会为其生成对应的事务Proposal,并发送给集群中其余机器,然后再分别收集各自的选票。因为ZAB协议将二阶段提交中的事务中断逻辑移除,所以只需要收集过半Follower服务器的反馈Ack后即可,最后就是进行事务提交。
【ZooKeeper系列】那ZooKeeper为什么还采用ZAB协议
|
6月前
|
存储 负载均衡 网络协议
ZooKeeper【基础 01】简介+设计目标+核心概念+ZAB协议+典型应用场景
【4月更文挑战第10天】ZooKeeper【基础 01】简介+设计目标+核心概念+ZAB协议+典型应用场景
91 1
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)