一,zookeeper的ZAB协议
1,ZAB概述
ZAB:zookeeper atomic broadcast(zookeeper原子广播协议)
ZAB协议主要包括这个 原子广播 和 崩溃恢复
原子广播
就是说集群的主结点leader用来写,其他follow从结点只用来读。在主结点写完会将数据同步到从结点,只要写入成功的从结点的数量超过一半,那么这个数据就同步成功。这个主结点同步到从结点可能会有一定的延迟,因此这个zookeeper主要是为了保证这个数据的最终一致性,也可以叫为顺序一致性。
崩溃恢复
如果在主结点刚把数据写完,这个主结点挂了,那么这个集群就会重新选举新的leader。选leader的规则就是先比较zxid事务id,再比较这个机器对应的myid,谁大谁被选为leader。
二,ZAB协议流程的源码实现
需要下载zookeeper源码,可以参考上一篇:https://blog.csdn.net/zhenghuishengq/article/details/126673923?spm=1001.2014.3001.5502
1,客户端建立连接
1,先创建一个zookeeper对象
ZooKeeper zooKeeper=new ZooKeeper(...);
这个zookeeper的构造方法可能如下,里面会有很多参数,里面主要是会对创建一个connection的连接。
public ZooKeeper( String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException{ //增加一个监听机制 validateWatcher(watcher); this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig(); ConnectStringParser connectStringParser = new ConnectStringParser(connectString); this.hostProvider = hostProvider; //建立一个ClientCnxn连接,会和这个服务端建立连接 cnxn = new ClientCnxn( connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); //开始连接 cnxn.start(); }
2,然后进入这个开始连接的start方法,这个方法里面主要是会去开启两个线程,一个线程主要用来连接服务端,一个线程主要用来响应连接后的事件
public void start() { sendThread.start(); eventThread.start(); }
开启这个了线程之后,主要是查看这个线程的run方法。
接下来看第一个 sendThread 线程的run方法,主要如下,用于连接这个服务端,主要是基于nio和netty两种方式实现这个连接。在连接成功之后,会通过这个nio轮询的方式监听里面的读写事件的发生,并对这个事件进行处理
@Override public void run() { while (state.isAlive()) { try { //如果没有建立连接 if (!clientCnxnSocket.isConnected()) { onConnecting(serverAddress); //建立连接 startConnect(serverAddress); } //在建立连接之后,会通过这个nio监听这个读写事件并处理 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } } } private void startConnect(InetSocketAddress addr) throws IOException { //基于这个nio或者netty两种方式实现,连接这个客户端 clientCnxnSocket.connect(addr); }
接下来看第二个线程 eventThread 的run方法,主要是调用一个watcher的一个监听机制
@Override public void run() { while (true) { //从阻塞队列里面获取一个事件对象 Object event = waitingEvents.take(); //处理这个事件 processEvent(event); } } private void processEvent(Object event) { WatcherSetEventPair pair = (WatcherSetEventPair) event; //Watcher监听机制,用于事件回调 watcher.process(pair.event); }
2,客户端写数据
3,建立连接成功之后,就可以开始写数据的操作,主要是通过这个create的这个方法实现
zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
这个create的方法具体如下,就是会将这个发送的数据以及存放的路径做一个封装
public String create( final String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { //会将传进来的数据和路径封装到一个request里面 request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); //通过这个连接对象实现这个客户端向服务端发送数据 ReplyHeader r = cnxn.submitRequest(h, request, response, null); }
发送数据的具体实现如下,会对这个传来的数据进行一个打包的操作,在数据打包之后,会将这个数据存放到一个阻塞队列里面
public ReplyHeader submitRequest(...){ //对这个传过来的数据进行一个打包操作 Packet packet = queuePacket(h,r,request,response,null,null,null,null, watchRegistration,watchDeregistration); waitForPacketFinish(r, packet); } public Packet queuePacket(){ synhronized (outgoingQueue) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } //将打包的数据存放到一个阻塞队列里面 outgoingQueue.add(packet); } //唤醒被阻塞的selector,然后向这个管道写入一个数据, //这样就可以触发前面的sendThread线程,并触发里面的写事件,将数据写入到服务端 sendThread.getClientCnxnSocket().packetAdded(); } }
最终会通过这个Socket的write写入事件,将这个序列化后的数据存放到buffer里面,然后通过这个SocketChannel的write方法将数据写入到服务端。数据主要是通过这个outgoingQueue队列,以异步的方式将这个信息发送到服务端。
3,服务端接收数据
4,服务端这边主要是在这个 ServerCnxnFactory 类下面的 createFactory 方法里面来构建与客户端的连接,这个服务端接收数据主要是通过主结点和这个客户端进行一个交互
public abstract class ServerCnxnFactory { static public ServerCnxnFactory createFactory() throws IOException { //这里主要是选择nio的方式或者选择netty的方式建立一个连接 String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { //然后通过这个反射的方式找到对应的server,如使用的netty连接就会找nettyServer ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); return serverCnxnFactory; } } }
5,接下来主要查看这个 NettyServerCnxnFactory 类的这个构造方法,底层主要是一些netty的一些实现逻辑。主要用来实现数据的传输
NettyServerCnxnFactory() { EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup( NettyUtils.getClientReachableLocalInetAddressCount()); EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.nioOrEpollServerSocketChannel()) // parent channel options .option(ChannelOption.SO_REUSEADDR, true) // child channels options .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_LINGER, -1) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (secure) { initSSL(pipeline, false); } else if (shouldUsePortUnification) { initSSL(pipeline, true); } pipeline.addLast("servercnxnfactory", channelHandler); } }); this.bootstrap = configureBootstrapAllocator(bootstrap); this.bootstrap.validate(); }
6,在建立好连接之后,服务端这边会通过这个channelRead 这个方法来读取通道的信息,就是客户端发过来的信息。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if (LOG.isTraceEnabled()) { LOG.trace("message received called {}", msg); } try { if (LOG.isDebugEnabled()) { LOG.debug("New message {} from {}", msg, ctx.channel()); } //服务端的连接对象 NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); if (cnxn == null) { } else { //处理客户端传过来的数据 cnxn.processMessage((ByteBuf) msg); } } catch (Exception ex) { LOG.error("Unexpected exception in receive", ex); throw ex; } } finally { ReferenceCountUtil.release(msg); } } //处理这个客户端传过来的数据 void processMessage(ByteBuf buf){ receiveMessage(buf); }
7,接下来就是正式的接收这个发送过来的数据,并对这个数据进行处理
private void receiveMessage(ByteBuf message) { //读取message,将数据读取到服务端的byteBuffer里面 message.readBytes(bb); //处理这个打包好的数据 zks.processPacket(this, bb); }
接下来进入这个processPacket 这个方法,主要是用来解析打包的数据。并且在服务端中,又会将这个数据进行一个打包,封装到一个request的一个阻塞队列里面,最后将这个打包的数据进行提交
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { //二进制流接收数据 InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); //将传送过来的序列化的数据进行一个反序列化 RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); //最后将客户端传过来的数据又封装到一个request的对象里面 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); setLocalSessionFlag(si); //提交这个数据 submitRequest(si); }
4,服务端主结点处理数据
8,就是进入上面的这个 submitRequest 的提交数据的这个方法,里面主要是通过一个processRequest的这个方法来进行处理这些数据
public void submitRequest(Request si) { firstProcessor.processRequest(si); }
9,这个firstProcessor的处理器主要是在 LeaderZooKeeperServer 类下面的这个 setupRequestProcessors 的这个方法初始化的。服务端这边主要会初始化这个Processor的一个链条,底层主要是通过这个责任链的方式实现。责任链主要是为了分工合作,模块解耦
@Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); //将上一个processor放入下一个processor,开始构建一个责任链的一个链条 RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); //获取队列的数据,并对数据进行读取 prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }
这个链条如下
10,在执行这个链条的过程中,会通过这个prepRequestProcessor这个线程来读取加在服务端队列的里面的消息,接下来主要就是查看这个线程里面的run方法。这个结点其主要是为了填充这个zxid,就是事务id
@Override public void run() { try{ Request request = submittedRequests.take(); //通过这个方法进行最终的处理 pRequest(request); } }
在这个pRequest方法里面,会比较之前客户端传过来的命令,比如说create,delete命令等,那么服务端就会执行具体的操作。
protected void pRequest(Request request) throws RequestProcessorException { try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: CreateRequest create2Request = new CreateRequest(); //创建命令的具体逻辑 pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; case OpCode.createTTL: CreateTTLRequest createTtlRequest = new CreateTTLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); break; case OpCode.deleteContainer: case OpCode.delete: DeleteRequest deleteRequest = new DeleteRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; ... }
接下来就是主要查看一下这个创建命令,主要是通过这个 pRequest2Txn 方法实现,并且里面的这个参数zxid,是一个automic的一个原子类型,不存在这个线程安全问题。获取的命令越新,那么这个zxid的值就越大。 每从这个内存队列里面获取一条消息,那么这个zxid的值就会加1
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException{ //将这个zxid填充回request request.zxid = zks.getZxid(); //由下一个process处理 nextProcessor.processRequest(request); }