HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

简介:         ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列。其定义及作为线程核心的run()方法如下: /** * Periodically calls computeReplicationWork().

        ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列。其定义及作为线程核心的run()方法如下:

  /**
   * Periodically calls computeReplicationWork().
   * 周期性调用computeReplicationWork()方法
   */
  private class ReplicationMonitor implements Runnable {

    @Override
    public void run() {
    	
      // 如果namesystem持续运行,while循环一直进行
      while (namesystem.isRunning()) {
        try {
          // Process replication work only when active NN is out of safe mode.
          if (namesystem.isPopulatingReplQueues()) {
            // 计算数据节点工作
        	computeDatanodeWork();
        	// 将复制请求超时的块重新加入到待调度队列
            processPendingReplications();
          }
          
          // 线程休眠replicationRecheckInterval时间
          Thread.sleep(replicationRecheckInterval);
        } catch (Throwable t) {
          if (!namesystem.isRunning()) {
            LOG.info("Stopping ReplicationMonitor.");
            if (!(t instanceof InterruptedException)) {
              LOG.info("ReplicationMonitor received an exception"
                  + " while shutting down.", t);
            }
            break;
          } else if (!checkNSRunning && t instanceof InterruptedException) {
            LOG.info("Stopping ReplicationMonitor for testing.");
            break;
          }
          LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
          terminate(1, t);
        }
      }
    }
  }
        ReplicationMonitor线程的run()方法运行逻辑比较清晰,如果namesystem持续运行,while循环一直进行,在这个循环内,仅当活跃NN不在安全模式时才会进行复制工作:

        1、调用computeDatanodeWork()方法计算数据节点工作;

        2、调用processPendingReplications()方法将复制请求超时的块重新加入到待调度队列

        3、线程休眠replicationRecheckInterval时间后继续运行。

        首先说下这个replicationRecheckInterval,它是名字节点检查新的复制工作的时间间隔,其初始化在BlockManager的构造函数中,代码如下:

    this.replicationRecheckInterval = 
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                  DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
        其取值取参数dfs.namenode.replication.interval,参数未配置的话,默认为3秒。

        再来看下计算数据节点工作的computeDatanodeWork()方法,它负责计算块复制、块无效工作可以被调度到数据节点的总数,数据节点将在接下来的心跳中被指派该工作,并返回被调度的复制或移除的块的数目,代码如下:

  /**
   * Compute block replication and block invalidation work that can be scheduled
   * on data-nodes. The datanode will be informed of this work at the next
   * heartbeat.
   * 
   * 计算块复制、块无效工作可以被调度到数据节点的总数。数据节点将在接下来的心跳中被指派该工作。
   * 返回被调度的复制或移除的块的数目
   * 
   * @return number of blocks scheduled for replication or removal.
   */
  int computeDatanodeWork() {
    // Blocks should not be replicated or removed if in safe mode.
    // It's OK to check safe mode here w/o holding lock, in the worst
    // case extra replications will be scheduled, and these will get
    // fixed up later.
	  
	// 如果namesystem处于安全模式,直接返回0
    if (namesystem.isInSafeMode()) {
      return 0;
    }

    // 通过心跳管理器heartbeatManager获取存活数据节点数
    final int numlive = heartbeatManager.getLiveDatanodeCount();
    
    // blocksReplWorkMultiplier为集群每个周期每个DataNode平均待复制的数据块数量,
    // blocksToProcess为每个周期集群需要复制的数据块数量
    final int blocksToProcess = numlive
        * this.blocksReplWorkMultiplier;
    
    // blocksInvalidateWorkPct为集群每个周期每个DataNode平均待删除的无效数据块百分比
    // nodesToProcess为集群每个周期待删除的无效数据块数量
    final int nodesToProcess = (int) Math.ceil(numlive
        * this.blocksInvalidateWorkPct);

    // 计算复制工作量workFound
    int workFound = this.computeReplicationWork(blocksToProcess);

    // Update counters
    // namesystem加写锁
    namesystem.writeLock();
    try {
    	
      // 调用updateState()方法更新相关状态
      this.updateState();
      
      // 将计算得到的复制工作量workFound赋值给被调度复制的数据块数scheduledReplicationBlocksCount
      this.scheduledReplicationBlocksCount = workFound;
    } finally {
    	
      // namesystem释放写锁
      namesystem.writeUnlock();
    }
    
    // 计算删除无效块工作量,并累加到workFound
    workFound += this.computeInvalidateWork(nodesToProcess);
    
    // 返回总工作量workFound
    return workFound;
  }
       computeDatanodeWork()方法的处理逻辑大体如下:

        1、如果namesystem处于安全模式,直接返回0;

        2、通过心跳管理器heartbeatManager获取存活数据节点数numlive;

        3、计算每个周期集群需要复制的数据块数量blocksToProcess:存活数据节点数numlive乘以集群每个周期每个DataNode平均待复制的数据块数量blocksReplWorkMultiplier,blocksReplWorkMultiplier取参数dfs.namenode.replication.work.multiplier.per.iteration,参数未配置的话默认为2;

        4、计算集群每个周期待删除的无效数据块数量nodesToProcess:存活数据节点数numlive乘以集群每个周期每个DataNode平均待删除的无效数据块百分比blocksInvalidateWorkPct,blocksInvalidateWorkPct取参数dfs.namenode.invalidate.work.pct.per.iteration,参数未配置的话默认为0.32f,计算结果向上取整;

        5、调用computeReplicationWork()方法,传入blocksToProcess,计算复制工作量workFound;

        6、namesystem加写锁;

        7、调用updateState()方法更新相关状态;

        8、将计算得到的复制工作量workFound赋值给被调度复制的数据块数scheduledReplicationBlocksCount;

        9、namesystem释放写锁;

        10、调用computeInvalidateWork()方法,传入nodesToProcess(),计算删除无效块工作量,并累加到workFound;

        11、返回总工作量workFound。

        下面,我们看下计算复制工作量的computeReplicationWork()方法,代码如下:

  /**
   * Scan blocks in {@link #neededReplications} and assign replication
   * work to data-nodes they belong to.
   *
   * The number of process blocks equals either twice the number of live
   * data-nodes or the number of under-replicated blocks whichever is less.
   *
   * @return number of blocks scheduled for replication during this iteration.
   */
  int computeReplicationWork(int blocksToProcess) {
    List<List<Block>> blocksToReplicate = null;
    
    // namesystem加写锁
    namesystem.writeLock();
    try {
      // Choose the blocks to be replicated
      // 通过neededReplications的chooseUnderReplicatedBlocks()方法,
      // 选取blocksToProcess个待复制的数据块,放入blocksToReplicate列表,
      // blocksToReplicate是一个数据块列表的列表,外层的位置索引代表数据块复制的优先级
      blocksToReplicate = neededReplications
          .chooseUnderReplicatedBlocks(blocksToProcess);
    } finally {
    	
      // namesystem释放写锁
      namesystem.writeUnlock();
    }
    
    // 调用computeReplicationWorkForBlocks()方法,进行实际数据块复制操作,传入待复制数据块列表的列表,位置索引代表复制的优先级
    return computeReplicationWorkForBlocks(blocksToReplicate);
  }
        computeReplicationWork()方法比较短,逻辑也很清晰,如下:

        1、namesystem加写锁;

        2、通过neededReplications的chooseUnderReplicatedBlocks()方法,选取blocksToProcess个待复制的数据块,放入blocksToReplicate列表,blocksToReplicate是一个数据块列表的列表,外层的位置索引代表数据块复制的优先级:

        关于如何通过neededReplications的chooseUnderReplicatedBlocks()方法选取blocksToProcess个待复制的数据块,请参考《HDFS源码分析之UnderReplicatedBlocks(二)》一文;

        3、namesystem释放写锁;

        4、调用computeReplicationWorkForBlocks()方法,进行实际数据块复制操作,传入待复制数据块列表的列表,位置索引代表复制的优先级。




相关文章
|
3月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
71 1
|
3月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
73 3
|
3月前
线程CPU异常定位分析
【10月更文挑战第3天】 开发过程中会出现一些CPU异常升高的问题,想要定位到具体的位置就需要一系列的分析,记录一些分析手段。
87 0
|
2月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
190 64
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
56 4
|
2月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
119 38
|
2月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
340 2
|
3月前
|
监控 数据可视化 Java
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
|
4月前
|
Arthas 监控 Java
监控线程池的内存使用情况以预防内存泄漏
监控线程池的内存使用情况以预防内存泄漏
|
4月前
|
监控 数据可视化 Java
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况

热门文章

最新文章