Hadoop2源码分析-HDFS核心模块分析

简介:

1.概述

  这篇博客接着《Hadoop2源码分析-RPC机制初识》 来讲述,前面我们对MapReduce、序列化、RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对这些模块的研究,我们明白了MapReduce的运行流程以及内部的实现机制,Hadoop的序列化以及它的通信 机制(RPC)。今天我们来研究另一个核心的模块,那就是Hadoop的分布式文件存储系统——HDFS,下面是今天分享的内容目录:

  • HDFS简述
  • NameNode
  • DataNode

  接下来,我们开始今天的分享内容。

2.HDFS简述

  HDFS全称Hadoop Distributed File System,在HDFS中有几个基本的概念,首先是它的数据块(Block),HDFS的设计是用于支持大文件的。运行在HDFS上的程序也是用于处理 大数据集的。这些程序仅写一次数据,一次或多次读数据请求,并且这些读操作要求满足流式传输速度。HDFS支持文件的一次写多次读操作。HDFS中典型的 块大小是64MB,一个HDFS文件可以被被切分成多个64MB大小的块,如果需要,每一个块可以分布在不同的数据节点上。HDFS 中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间。

  HDFS提供了一个可操作文件系统的抽象类org.apache.hadoop.fs.FileSystem,该类被划分在Hadoop- Common部分,其源码地址为:hadoop-2.6.0-src/hadoop-common-project/hadoop-common/src /main/java/org/apache/hadoop/fs/FileSystem.java,如下是FileSystem的部分源码,如下所示:

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {
        // 代码内容省略
        // ...            
}

  我们可以使用着抽象类,去操作HDFS系统上的内容,实现代码如下所示:

private static void dfs() {
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);// get file object
            FileStatus[] list = fs.listStatus(new Path("/"));// file status list
            for (FileStatus file : list) {
                LOGGER.info(file.getPath().getName());// print file names
            }
        } catch (IOException e) {
            e.printStackTrace();
            LOGGER.error("Get hdfs path has error,msg is " + e.getMessage());
        } finally {
            try {
                if (fs != null) {
                    fs.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
                LOGGER.error("Close fs object has error,msg is " + e.getMessage());
            }
        }
    }

  下面,我们来看另一个概念是元数据节点(Namenode)和数据节点(datanode),这2个是HDFS的核心模块,下面我们分别来看看这2个核心模块。

3.NameNode

  NN节点用来管理文件系统的NameSpace,将所有的文件和文件夹的Meta保存在一个文件系统中,是HDFS中文件目录和文件分配的管理者,保存的重要信息如下所示:

  在HDFS集群上可能包含成百上千个DataNode(简称DN)节点,这些DN节点定时和NameNode(简称NN)节点保持通信,接受 NN节点的一些指令,为了减小NN的压力,NN上并不永久存储那个DN上报的数据块信息,而是通过DN上报的状态来更新NN上的映射表信息。DN和NN建 立连接后,会和NN保持心跳,心跳返回的信息包含了NN对DN的一些指令信息,如删除数据,复制数据到其他的DN节点。值得注意的是NN不会主动去请求 DN,这是一个严格意义上的C/S架构模型,同时,客户端在操作HDFS集群时,DN节点会互相配合,保证数据的一致性。

  NN节点信息存储,部分截图信息如下所示:

4.DataNode

  下面我们来分析一下DN的实现,DN的实现包含以下部分,一部分是对本地Block的管理,另一部分就是和其他的Entity进行数据交互。首 先,我们先看本地的Block管理部分。我们在搭建Hadoop集群时,会指定Block的存储路径,我们可以找到配置的存储路径,在hdfs- site.xml文件下,内容路径如下所示:



<property>
        <name>dfs.datanode.data.dir</name>
        <value>/home/hadoop/data/dfs/data</value>
</property>

  然后,我们进入到DN节点上,找到对应的存储目录,如下图所示:

  这里面in_use.lock的作用是做一个排斥操作,在对应的应用上面加锁。然后current目录存放的是当前有效的Block,进入到current目录后,出现如下图所示的目录:

  VERSION存放着一些文件的Meta,接着还有一系列的Block文件和Meta文件,Block文件是存储了HDFS中的数据的。存储的 Block,一个Block在多个DN节点上有备份,其备份参数可以调节,在hdfs-site.xml文件中,属性设置如下所示:



<property>
        <name>dfs.replication</name>
        <value>3</value>
</property>

  首先,我们来看DateNode的类,部分代码如下所示:

@VisibleForTesting
  @InterfaceAudience.Private
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    DataNode dn = instantiateDataNode(args, conf, resources);// init dn
    if (dn != null) {
      dn.runDatanodeDaemon();// register to nn and back to dn thread
    }
    return dn;
  }


/** Instantiate a single datanode object, along with its secure resources. 
   * This must be run by invoking{@link DataNode#runDatanodeDaemon()} 
   * subsequently. 
   */
  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
    
    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }
    
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }


static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);// create dn obejct
  }


public void runDatanodeDaemon() throws IOException {
    blockPoolManager.startAll();

    // start dataXceiveServer
    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.start();
    startPlugins(conf);
  }


public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null) {
        datanode.join();
      } else {
        errorCode = 1;
      }
    } catch (Throwable e) {
      LOG.fatal("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      // We need to terminate the process here because either shutdown was called
      // or some disk related conditions like volumes tolerated or volumes required
      // condition was not met. Also, In secure mode, control will go to Jsvc
      // and Datanode process hangs if it does not exit.
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }

  • Main函数入口

  下面给出DN类的Main函数入口,代码片段如下所示:

public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    secureMain(args, null);
  }

5.总结

  在研究HDFS的相关模块时,这里需要明白各个模块的功能及作用,这里为大家介绍了DN类的部分代码片段,以及给代码片段重要部分添加了代码注释,若是大家需要了解详细的相关流程及代码,可以阅读Hadoop的HDFS部分的源代码。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

目录
相关文章
|
10月前
|
API 开发工具 iOS开发
iOS 开发高效率工具包:10 大必备工具
iOS 开发高效率工具包:10 大必备工具
142 1
|
7月前
|
数据采集 DataWorks 数据挖掘
提升数据分析效率:DataWorks在企业级数据治理中的应用
【8月更文第25天】本文将探讨阿里巴巴云的DataWorks平台如何通过建立统一的数据标准、规范以及实现数据质量监控和元数据管理来提高企业的数据分析效率。我们将通过具体的案例研究和技术实践来展示DataWorks如何简化数据处理流程,减少成本,并加速业务决策。
732 54
|
9月前
|
监控 安全 关系型数据库
精通MySQL:数据库核心技术与应用实践
h3> 一、引言 MySQL作为开源关系型数据库管理系统的佼佼者,凭借其出色的性能、灵活性和稳定性,成为许多企业和开发者的首选
|
4月前
|
机器学习/深度学习 人工智能 算法
人工智能在医疗诊断中的应用与挑战
【10月更文挑战第34天】人工智能(AI)技术正在改变医疗行业的面貌,为诊断过程带来前所未有的效率和准确性。通过深度学习、神经网络等技术,AI能够分析大量数据,辅助医生做出更快速、更准确的诊断决策。然而,AI在医疗领域的应用也面临着数据隐私、算法透明度和医疗责任等一系列挑战。本文将探讨AI在医疗诊断中的具体应用案例,分析其面临的挑战,并提供对未来发展方向的思考。
|
存储 分布式计算 Ubuntu
大数据在单机进行Hadoop的伪分布式安装(安装Linux~Ubuntu 的虚拟机~VirtualBox 和安装 Hadoop)
大数据在单机进行Hadoop的伪分布式安装(安装Linux~Ubuntu 的虚拟机~VirtualBox 和安装 Hadoop)
287 0
大数据在单机进行Hadoop的伪分布式安装(安装Linux~Ubuntu 的虚拟机~VirtualBox 和安装 Hadoop)
|
机器学习/深度学习 自然语言处理 TensorFlow
Sequential与Model模型、keras基本结构功能
不得不说,这深度学习框架更新太快了尤其到了Keras2.0版本,快到Keras中文版好多都是错的,快到官方文档也有旧的没更新,前路坑太多。  到发文为止,已经有theano/tensorflow/CNTK支持keras,虽然说tensorflow造势很多,但是笔者认为接下来Keras才是正道。
3651 0
|
人工智能 数据安全/隐私保护
二战时图灵机破译的Enigma密码,现在AI仅需13分钟便可破译
Enigma在二战时一直被当做是不可破译的密码,英国花费大量时间精力方得破解。现在即使知晓Enigma密码知识,破译仍需要数年的时间,而DigitalOcean及Enigma Pattern公司发明的AI技术,仅需13分钟即可破译。未来这种人工智能软件也可用于医疗保健、金融服务等行业。
4324 0
|
20天前
|
存储 人工智能 测试技术
小鱼深度评测 | 通义灵码2.0,不仅可跨语言编码,自动生成单元测试,更炸裂的是集成DeepSeek模型且免费使用,太炸裂了。
小鱼深度评测 | 通义灵码2.0,不仅可跨语言编码,自动生成单元测试,更炸裂的是集成DeepSeek模型且免费使用,太炸裂了。
141061 20
小鱼深度评测 | 通义灵码2.0,不仅可跨语言编码,自动生成单元测试,更炸裂的是集成DeepSeek模型且免费使用,太炸裂了。
|
19天前
|
人工智能 运维 前端开发
基于阿里百炼的DeepSeek-R1满血版模型调用【零门槛保姆级2084小游戏开发实战】
本文介绍基于阿里百炼的DeepSeek-R1满血版模型调用,提供零门槛保姆级2048小游戏开发实战。文章分为三部分:定位与核心优势、实战部署操作指南、辅助实战开发。通过详细步骤和案例展示,帮助开发者高效利用DeepSeek-R1的强大推理能力,优化游戏逻辑与视觉效果,解决官网响应延迟问题,提升开发效率和用户体验。适合企业开发者、教育行业及多模态探索者使用。
70897 17
基于阿里百炼的DeepSeek-R1满血版模型调用【零门槛保姆级2084小游戏开发实战】
|
27天前
|
人工智能 自然语言处理 Shell
深度评测 | 仅用3分钟,百炼调用满血版 Deepseek-r1 API,百万Token免费用,简直不要太爽。
仅用3分钟,百炼调用满血版Deepseek-r1 API,享受百万免费Token。阿里云提供零门槛、快速部署的解决方案,支持云控制台和Cloud Shell两种方式,操作简便。Deepseek-r1满血版在推理能力上表现出色,尤其擅长数学、代码和自然语言处理任务,使用过程中无卡顿,体验丝滑。结合Chatbox工具,用户可轻松掌控模型,提升工作效率。阿里云大模型服务平台百炼不仅速度快,还确保数据安全,值得信赖。
358010 62
深度评测 | 仅用3分钟,百炼调用满血版 Deepseek-r1 API,百万Token免费用,简直不要太爽。

相关实验场景

更多