Hadoop-2.7.0中HDFS NameNode HA实现之DFSZKFailoverController、ZKFailoverController(一)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 一、简介      DFSZKFailoverController是Hadoop-2.7.0中HDFS NameNode HA实现的中心组件,它负责整体的故障转移控制等。它是一个守护进程,通过main()方法启动,继承自ZKFailoverController。

一、简介

      DFSZKFailoverController是Hadoop-2.7.0中HDFS NameNode HA实现的中心组件,它负责整体的故障转移控制等。它是一个守护进程,通过main()方法启动,继承自ZKFailoverController。

二、实现流程

      1、启动

       通过main()方法启动,如下:

  /**
   * 进程启动的main()方法
   */
  public static void main(String args[])
      throws Exception {
	  
	//  解析参数
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    
    // 通过静态方法创建DFSZKFailoverController实例zkfc
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    int retCode = 0;
    try {
      // 调用zkfc的run()方法,执行主业务逻辑
      retCode = zkfc.run(parser.getRemainingArgs());
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }
      解析参数,然后通过静态方法构造一个zkfc实例,调用zkfc的run()方法,执行主业务逻辑。

      2、实例化

      实例化是在静态方法create中完成的,如下:

  /**
   * 对象实例化用的静态方法
   */
  public static DFSZKFailoverController create(Configuration conf) {
    
	// 获取本地NameNode配置信息  
	Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
    // 获取该NameNode的命名服务ID:NamenodeNameServiceId
	String nsId = DFSUtil.getNamenodeNameServiceId(conf);

	// 检测是否支持HA,不支持的话直接抛出异常
    if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
      throw new HadoopIllegalArgumentException(
          "HA is not enabled for this namenode.");
    }
    
    // 获取NameNode ID,并校验
    String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
    if (nnId == null) {
      String msg = "Could not get the namenode ID of this node. " +
          "You may run zkfc on the node other than namenode.";
      throw new HadoopIllegalArgumentException(msg);
    }
    
    // NameNode初始化通用keys
    NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
    DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);
    
    // 构造NNHAServiceTarget实例localTarget,NNHAServiceTarget继承自HAServiceTarget,代表一个客户端HA管理命令的目标。其实就是封装了网络连接的相关地址参数
    NNHAServiceTarget localTarget = new NNHAServiceTarget(
        localNNConf, nsId, nnId);
    
    // 利用本地配置、localTarget构造DFSZKFailoverController
    return new DFSZKFailoverController(localNNConf, localTarget);
  }
      主要就是构造DFSZKFailoverController对象,并在此之前,通过配置信息,获取一些前置参数,比如:

      1)NameNode的命名服务ID:NamenodeNameServiceId;

      2)NameNode ID;

      3)构造NNHAServiceTarget实例localTarget,NNHAServiceTarget继承自HAServiceTarget,代表一个客户端HA管理命令的目标。其实就是封装了网络连接的相关地址参数,包括socket地址等;
      上述这些在ZooKeeper上注册节点时,是需要通过protobuf序列化后写入的节点数据的。

      3、运行

      运行时通过父类run()、doRun()方法实现的,如下:

  private int doRun(String[] args)
      throws HadoopIllegalArgumentException, IOException, InterruptedException {
    try {
      // 初始化zookeeper,
      initZK();
    } catch (KeeperException ke) {
      LOG.fatal("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.");
      return ERR_CODE_NO_ZK;
    }
    if (args.length > 0) {
      if ("-formatZK".equals(args[0])) {
        boolean force = false;
        boolean interactive = true;
        for (int i = 1; i < args.length; i++) {
          if ("-force".equals(args[i])) {
            force = true;
          } else if ("-nonInteractive".equals(args[i])) {
            interactive = false;
          } else {
            badArg(args[i]);
          }
        }
        return formatZK(force, interactive);
      } else {
        badArg(args[0]);
      }
    }

    if (!elector.parentZNodeExists()) {
      LOG.fatal("Unable to start failover controller. "
          + "Parent znode does not exist.\n"
          + "Run with -formatZK flag to initialize ZooKeeper.");
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
          "You must configure a fencing method before using automatic " +
          "failover.", e);
      return ERR_CODE_NO_FENCER;
    }

    // 初始化rpc
    initRPC();
    
    // 初始化健康检查器
    initHM();
    
    // 启动rpc
    startRPC();
    
    // 主循环
    try {
      mainLoop();
    } finally {
      
      // 停止rpc服务
      rpcServer.stopAndJoin();
      
      // 选举器退出选举
      elector.quitElection(true);
      
      // 停止健康检查器
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }
      首先会做一些初始化操作,比如初始化zookeeper、初始化rpc、初始化健康监视器、启动rpc,然后进入主循环,当主循环退出时,再完成一些停止或关闭操作,比如:停止rpc服务、选举器退出选举、停止健康检查器等。

      (一)初始化

      1)zk初始化:

      主要是从配置信息中获取ZooKeeper连接参数,比如zkQuorum、zkTimeout、zkAcls、zkAuths、getParentZnode(ha根路径)、maxRetryNum等,然后创建选举器ActiveStandbyElector实例elector,并注册一个回调函数ElectorCallbacks。

      HA在ZooKeeper上的路径取自参数ha.zookeeper.parent-znode,默认为/hadoop-ha。

      关于回调函数的作用,在后续文章中单独介绍。

      2)rpc初始化与启动:

      rpc的初始化主要是构造一个ZKFCRpcServer对象,绑定socket地址,然后start()启动。

      3)健康监视器初始化与启动:

      健康监视器的初始化则是构造HealthMonitor对象,传入NN地址封装类HAServiceTarget实例localTarget,然后注册状态回调函数、注册服务状态回调函数,最后启动,如下:

  /**
   * 初始化健康监视器:HealthMonitor
   */
  private void initHM() {
	  
	// 构造健康监视器HealthMonitor实例
    healthMonitor = new HealthMonitor(conf, localTarget);
    
    // 注册状态回调函数
    healthMonitor.addCallback(new HealthCallbacks());
    
    // 注册服务状态回调函数
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    
    // 启动健康监视器
    healthMonitor.start();
  }
      这个状态回调函数和服务状态回调函数是在健康监视器有结果时调用的,主要是设置上次健康状态并复核选举可能性。

      (二)主循环

        主循环的逻辑比较简单,如下:

  private synchronized void mainLoop() throws InterruptedException {
    
	// 如果没有致命错误的话,一直等待  
	while (fatalError == null) {
      wait();
    }
    
	// 如果有致命错误,抛出运行时异常
	assert fatalError != null; // only get here on fatal
    throw new RuntimeException(
        "ZK Failover Controller failed: " + fatalError);
  }
      1)如果没有致命错误的话,一直调用wait()进行等待 ;
      2)如果有致命错误,抛出运行时异常。

      (三)停止

        停止比较简单,调用一些列方法实现即可。


      剩余的回调函数、复核选举可能性等细节请关注后续文章。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
153 6
|
1月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
62 3
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
42 4
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
46 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
86 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
38 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
48 0
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
65 2
|
20天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
71 2
|
21天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
59 1

相关实验场景

更多
下一篇
无影云桌面