HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:         在《HDFS源码分析心跳汇报之数据结构初始化》一文中,我们了解到HDFS心跳相关的BlockPoolManager、BPOfferService、BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的。

        在《HDFS源码分析心跳汇报之数据结构初始化》一文中,我们了解到HDFS心跳相关的BlockPoolManager、BPOfferService、BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的。那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程。

        首先,我们先看下

        那么,BPServiceActor线程是通过什么样的流程来实现心跳的呢?我们来看下它正常工作的run()方法,代码如下:

  /**
   * No matter what kind of exception we get, keep retrying to offerService().
   * That's the loop that connects to the NameNode and provides basic DataNode
   * functionality.
   *
   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
   * happen either at shutdown or due to refreshNamenodes.
   */
  @Override
  public void run() {
	  
	// 记录日志信息:starting to offer service
    LOG.info(this + " starting to offer service");

    try {
    	
      // 在一个while循环内,完成连接NameNode并握手操作,即初始化
      while (true) {
        // init stuff
        try {
          // setup storage
          // 连接NameNode并握手
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
        	
          // 如果存在异常
        	
          // Initial handshake, storage recovery or registration failed
        	
          // 现将运行状态runningState设置为初始化失败INIT_FAILED
          runningState = RunningState.INIT_FAILED;
          
          // 调用shouldRetryInit()方法判断初始化失败时是否可以重试
          if (shouldRetryInit()) {
        	  
            // Retry until all namenode's of BPOS failed initialization
        	// 记录error日志信息
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            
            // 线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作
            sleepAndLogInterrupts(5000, "initializing");
            
          } else {
        	  
        	// 不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }

      // 设置运行状态runningState为正在运行RUNNING
      runningState = RunningState.RUNNING;

      // 进入另一个while循环,不停的调用offerService()方法,
      // 发送心跳给NameNode并接收来自NameNode,然后根据命令交给不同的组件去处理
      // 循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true
      while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
        	
          // 存在异常的话,记录error日志,并休眠5s
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      
      // 设置运行状态runningState为已退出EXITED
      runningState = RunningState.EXITED;
      
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      
      // 清空,释放占用的资源
      cleanUp();
    }
  }
        在run()方法的开始,也就是BPServiceActor线程刚启动时,会在一个while循环内,完成连接NameNode并握手操作,即初始化。这里,是通过调用connectToNNAndHandshake()方法完成与NameNode的连接并进行两次握手的。值得一提的是,如果出现了IOException异常,会先将运行状态runningState设置为初始化失败INIT_FAILED,然后调用shouldRetryInit()方法判断初始化失败时是否可以重试:

        1、如果可以重试的话,记录error日志信息,线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作;

        2、不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回。

        接下来,设置运行状态runningState为正在运行RUNNING,进入另一个while循环,不停的调用offerService()方法,发送心跳给NameNode并接收来自NameNode的命令,然后根据命令交给不同的组件去处理,循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true;

        而当循环过程中存在异常Exception的话,记录error日志,并休眠5s,然后继续循环,只有当shouldRun()方法返回false,才退出循环,设置运行状态runningState为已退出EXITED,最终调用cleanUp()方法释放占用的资源等。

        上述就是BPServiceActor线程正常工作进行周期性心跳的主流程。下面,我们针对其中的某些细节进行详细描述。        

        首先,完成与NameNode的连接并进行两次握手的connectToNNAndHandshake()方法,实现如下:

  /**
   * 连接NameNode并握手
   */
  private void connectToNNAndHandshake() throws IOException {
    
	// get NN proxy
	// 利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode
    bpNamenode = dn.connectToNN(nnAddr);

    // First phase of the handshake with NN - get the namespace
    // info.
    // 与NameNode握手第一阶段:获取命名空间信息
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    
    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    // 验证,并设置命名空间信息()
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    
    // Second phase of the handshake with the NN.
    // 与NameNode握手第二阶段,注册
    register();
  }
        它的主要处理流程如下:

        1、利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode
        2、与NameNode握手第一阶段:调用retrieveNamespaceInfo()方法获取命名空间信息nsInfo;

        3、通过bpos的verifyAndSetNamespaceInfo()方法进行验证,并设置命名空间信息nsInfo;

        4、与NameNode握手第二阶段,调用register()方法进行注册。

        接着,我们再看下实现周期性心跳的offerService()方法,代码如下:


相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
2月前
|
Java 测试技术 API
【JUC】(1)带你重新认识进程与线程!!让你深层次了解线程运行的睡眠与打断!!
JUC是什么?你可以说它就是研究Java方面的并发过程。本篇是JUC专栏的第一章!带你了解并行与并发、线程与程序、线程的启动与休眠、打断和等待!全是干货!快快快!
432 2
|
2月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
199 1
|
3月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
5月前
|
人工智能 Java Linux
Go 调度器:一个线程的执行流程
本文详细解析了Go语言运行时调度器的初始化流程,重点介绍了GMP模型的构建过程。内容涵盖调度器初始化函数`runtime·schedinit`、线程与处理器的绑定、P结构体的创建与初始化,以及主Goroutine的启动流程。通过源码分析,帮助读者深入理解Go运行时的底层机制。
115 0
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
286 1
|
存储 安全
HDFS读写流程详解
HDFS读写流程详解
1177 2
HDFS读写流程详解
|
11月前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
795 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
线程CPU异常定位分析
【10月更文挑战第3天】 开发过程中会出现一些CPU异常升高的问题,想要定位到具体的位置就需要一系列的分析,记录一些分析手段。
290 0
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
246 12
|
12月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
445 4

热门文章

最新文章