启动单节点#
这次再进入这个方法,我们直接跳过它是如果从配置文件中读取出配置信息了,然后直接看它的启动方法
runFromConfig方法
主要做了如下几件事
- 创建
ZooKeeperServer
它是单机ZK服务端的实例
如下的ZooKeeperServer相关的属性 private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; protected RequestProcessor firstProcessor 以及它可以构建DataTree
- 创建
ZooKeeperServerShutdownHandler
监控ZkServer关闭状态的处理器 - 创建
FileTxnSnapLog
文件快照相关的工具类 - 给ZKServer绑定上
单位时间trickTime
(节点心跳交流的时间) - 初始化 ZKServer
处理事务,快照相关的工具类
- 创建上下文的工厂
- 通过工厂,启动上下文
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call run() in this thread. // todo 请注意,当前线程不会做其他任何事情,因此我们只在当前线程中调用Run方法,而不是开启新线程 // create a file logger url from the command line args // todo 根据命令中的args 创建一个logger文件 final ZooKeeperServer zkServer = new ZooKeeperServer(); // Registers shutdown handler which will be used to know the server error or shutdown state changes. // todo 注册一个shutdown handler, 通过他了解server发生的error或者了解shutdown 状态的更改 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // todo FileTxnSnapLog工具类, 与 文件快照相关 txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); txnLog.setServerStats(zkServer.serverStats()); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); // todo 创建Server上下文的工厂,工厂方法模式 // todo ServerCnxnFactory是个抽象类,他有不同是实现, NIO版本的 Netty版本的 cnxnFactory = ServerCnxnFactory.createFactory(); // todo 建立socket,默认是NIOServerCnxnFactory(是一个线程) cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // todo 跟进这个方法 cnxnFactory.startup(zkServer);
- 看一下如何创建处理事务,快照日志相关的数据文件的逻辑,可以看到,直接去关联我们配置的dataDir,snapDir,对应着日志存储的目录已经快照存储的目录, 然后封装进
FileSnap
和FileTxnLog
对象中
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); // todo 关联上指定数据文件和日志文件 // todo 给FileTxnSnapLog赋值 this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); if (!this.dataDir.exists()) { ... . . // todo 将这两个文件封装进 FileTxnLog 给当前类维护的两种事务快照( TnxnSnap ) 赋值 txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir);
- 上下文工厂
如上图,将ServerCnxnFactory.java
的继承图,不同的上下文工厂的实现可以创建出不同的上下文,通过这个图可以看到,不仅支持传统的NIO,还有一套Netty的实现,当前我选择的是原生的实现NIOServerCnxnFactory的实现,那么由他创建出来的就是NIOServerCnxn
启动流程如下图
上下文工厂实例化服务端的NIOSocket
#
在这个方法中创建了ZooKeeperThread
,这个类ZK中设计的线程类,几乎全部的线程都由此类完成,当前方法中的做法是将创建的Thread赋值给了当前的类的引用,实际上约等于当前类就是线程类,还有需要注意的地方就是虽然进行了初始化,但是并没有开启
此处看到的就是java原生的NIO Socket编程, 当前线程类被设置成守护线程
Thread thread; @Override public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); // todo 把当前类作为线程 thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); //todo 所以这里的这个线程是为了和JVM生命周期绑定,只剩下这个线程时已经没有意义了,应该关闭掉。 thread.setDaemon(true); maxClientCnxns = maxcc; // todo 看到了NIO原生的代码,使用打开服务端的 Channel, 绑定端口,设置为非阻塞,注册上感兴趣的事件是 accept 连接事件 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
- 由上下文工厂实例化的
NIOServerCnxn
下面是它的属性,可以看到其实这个上下文涵盖的很全面,甚至服务端的ZK都被他维护着,
NIOServerCnxnFactory factory; final SocketChannel sock; protected final SelectionKey sk; boolean initialized; ByteBuffer lenBuffer = ByteBuffer.allocate(4); ByteBuffer incomingBuffer = lenBuffer; LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>(); int sessionTimeout; protected final ZooKeeperServer zkServer;
上下文工厂(ServerFactoryCnxn)启动#
看完了ZooKeeperServerMain
中runFromConfig
方法中的创建ZKServer,FileTxnSnapLog
等重要对象的逻辑,下面,上下文启动, 直接点击去查看这个方法,肯定直接进入ServerFactoryCnxn
,我们选择的是它的实现类NIOServerCnxnFactory
public void runFromConfig(ServerConfig config) throws IOException { . . . cnxnFactory.startup(zkServer);
下面是NIOServerCnxnFactory
的实现,它做的第一件事就是开启上面实例化的所说的线程类,这条线程的开启标记着,服务端从此可以接收客户端发送的请求了
这个方法还做了如下三件事
- 将ZooKeeperServer交给上下文维护
- 因为这个是启动,所以从磁盘中完成数据的恢复
- 继续运行
- 创建计时器
- 开启计时器
- 开启三条处理器
- 注册JMX
- 修改运行的状态
- 唤醒全部线程
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { // todo start(); ==> run() 开启线程 start(); //todo 实现在上面, 到目前为止服务端已经可以接受客户端的请求了 // todo 将ZKS 交给NIOServerCnxnFactory管理,意味着NIOServerCnxnFactory是目前来说,服务端功能最多的对象 setZooKeeperServer(zks); // todo 因为是服务端刚刚启动,需要从从disk将数据恢复到内存 zks.startdata(); // todo 继续跟进 zks.startup(); }
完成数据的恢复#
跟进startData()方法
, 看到先创建ZKDatabase
,这个对象就是存在于内存中的对象,对磁盘中数据可视化描述
// todo 将数据加载进缓存中 public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { // todo 如果没初始化的话就初始化 zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { // todo 恢复数据 loadData(); } }
跟进创建ZKDataBase的逻辑, 最直观的可以看见,这个DB维护了DataTree和SnapLog
public ZKDatabase(FileTxnSnapLog snapLog) { // todo 创建了DataTree 数据树的空对象 dataTree = new DataTree(); sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); //todo 用初始化好了的存有关于系统事务日志将snaplog初始化 this.snapLog = snapLog; }
loaddata()
public void loadData() throws IOException, InterruptedException { // todo zkDatabase 已经初始化了 if(zkDb.isInitialized()){ // todo zxid = 最近的一次znode的事务id setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { //todo zkDB 没有初始化就使用 zkDb.loadDataBase() , 跟进去看, 他从快照中获取数据 setZxid(zkDb.loadDataBase()); } // Clean up dead sessions LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
- 继续启动
zks.startup();
它的源码在下面,其中的计时器类也是一个线程类
// todo 继续启动, 服务端和客户端建立连接后会保留一个session, 其中这个sessiion的生命周期倒计时就在下面的 createSessionTracker(); public synchronized void startup() { if (sessionTracker == null) { // todo 创建session计时器 createSessionTracker(); } // todo 开启计时器 startSessionTracker(); // todo 设置请求处理器, zookeeper中存在不同的请求处理器, 就在下面 setupRequestProcessors(); //todo 是一个为应用程序、设备、系统等植入管理功能的框架。 //todo JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用 registerJMX(); // todo 修改状态 --> running setState(State.RUNNING); // todo 唤醒所有线程, 因为前面有一个线程等待处理器 睡了一秒 notifyAll(); }
设置请求处理器#
着重看一下它的setupRequestProcessors()
添加请求处理器,单机模式下仅仅存在三个处理器,除了最后一个不是线程类之外,其他两个都是线程类
- PrepRequestProcessor
- 校验权限
- 修改请求的状态
- SyncRequestProcessor
- 将request持久化日志文件
- 打快照
- FinalRequestProcessor
- 响应客户端的请求
protected void setupRequestProcessors() { // todo 下面的三个处理器的第二个参数是在指定 下一个处理器是谁 RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); // todo 在服务端, 数据的处理 socket -> packet -> request -> queue // todo 然后由下面的requestprocessor 链 进行下一步处理request // todo 开启新线程, 服务端接收的客户端的请求都放在了 队列中,用处理器异步处理 ((SyncRequestProcessor)syncProcessor).start(); //todo 第一个处理器 , 下一个处理器是 syncProcessor 最后一个处理器 finalProcessor firstProcessor = new PrepRequestProcessor(this, syncProcessor); // todo 开启新线程 服务端接收的客户端的请求都放在了 队列中,用处理器异步处理 ((PrepRequestProcessor)firstProcessor).start(); }