HDFS 的写数据流程分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: HDFS的写数据流程是一道比较常见的面试题,同时梳理了写流程也可以帮助我们更加深入一点的了解 HDFS 的主要原理和各个组件的交互过程

一、前言

想必每次去面试都复习这样一道题:HDFS 的读写流程,自然是先百度一番,复制一下答案,1 2 3 4 5 6 点,背一背完事。面试完,还是不了解 HDFS 究竟是怎么设计这个写数据流程的。

其实这个里面也有很多我们值得学习的东西,比如既然写数据到 DataNode,如何保障数据一致性,如何保障数据在写的时候不丢失,重试如何做的,如何做三备份的?

那么这次咱就趴一趴 HDFS 的写数据流程吧。

二、往 HDFS 写数据的客户端代码

我们用 HDFS 的 api ,从一个写数据的代码开始剖析这个过程:

public class TestWriteHdfsFile {     
    public static void main(String[] args) throws IOException {         
        Configuration configuration = new Configuration();         
        FileSystem fileSystem = FileSystem.get(configuration);         
        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/user/data.txt"));         
        fsDataOutputStream.write("contents".getBytes(StandardCharsets.UTF_8));     
    }
}

从这段源码可以看到,写一个数据要有两步,第一步是要 create 一个文件,第二步才是往这个文件写对象。

三、写数据的 create 方法

先获得一个 FileSystem 对象,创建了一个文件返回了 FSDataOutputStream 对象,然后用这个对象写字节数组。

自然得从 create 方法看起了,点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去(当前类:FileSystem)

再点进去,发现是一个抽象方法了(当前类:FileSystem)

ctrl + alt + B,找到实现类,并进入 DistributedFileSystem 类中(当前类:DistributedFileSystem )

再点进去(当前类:DistributedFileSystem )

在 dfs 的 create 方法中,做了三件事情:

  • 往 NameNode 文件目录树的合理位置中添加了 INodeFile 节点;
  • 对这个要往里面写数据的文件,添加了【契约】管理;
  • 启动了 DataStreamer,这是写数据的关键服务。

我们再点进去(当前类:DFSClient)

点进去(当前类:DFSClient)

我们来看 DFSOutputStream 方法的 newStreamForCreate 方法,点进去(当前类:DFSClient)

可以看到,这里即将要调用 NameNode 的代理去执行 create 方法。create 方法再点进去已经没有实现类了,因为这已经是在用 Hadoop Rpc 调用 NameNode 的代理方法了。

具体的实现在 NameNodeRpcServer 里面,NameNode 这一侧大概做了三件事:

(1)创建了一个 INodeFile 加入到内存的文件目录管理树里面;

(2)为当前文件创建契约。所谓契约就是,拥有了这个文件契约的客户端才有权限去写这个文件,防止多个客户端同时写一个文件。

(3)把元数据信息存入 EditLog 日志里面;

(这里面也有很多的逻辑,我们用一个新篇章来讲这些事情)

到目前位置,当前的流程图为:

在 NameNode 这一侧完成了这些事情之后,下面创建了一个很重要的对象:DFSOutputStream,这是写数据流程里面最重要的一个对象。

当前类(DFSOutputStream)

点进去 start 方法看一下,发现是调用的内部的 DataStreamer 类的 start 方法,DataStreamer 类是一个线程。

可以看一下(当前位置:DFSOutputStream 的 DataStreamer 类)

它既然是一个线程,那我们看下这个类的 run 方法是在做什么:

可以看到这是一个 while 循环,里面用 synchronize 锁住了 DataQueue 对象,如果这个队列是空的话,就在 wait 。(当前位置:DFSOutputStream 的 DataStreamer 类)

此时我们的流程图是这样子的:

到目前为止,就是 create 方法,做了这么多的事情。记住,当前还是在 客户端这边的。

四、开始执行 write 方法

我们直接点进去 write() 方法(当前位置:TestWriteFile 客户端测试代码)

发现点进去竟然是在 jdk 的代码里面,这不太对。

我们可以再次从 create 方法点到 这个地方来(当前位置:DistributedFileSystem)

点进去,发现其实它其实真正返回的类是 HdfsDataOutputStream,并且在构造函数里面传了一个 out 对象进来,这个 out 对象就是 DFSOutputStream。(当前位置:DFSClient)

那我们看一下 HdfsDataOutputStream 这个类的 write 方法。搜索了一下,发现没有。那可能在父类里,我们去父类里看一下:

发现父类里有 write() 方法(当前位置:FSDataOutputStream)

调用的是 out 的 write 方法,上面可以看到,其实 out 是 DFSOutputStream。那我们看 DFSOutputStream 的 write 方法,发现又没有,那可能是在父类里面:

发现父类有 write 方法(当前位置:FSOutputSummer)

再进去就是核心写数据的方法了。

五、HDFS 文件最小的组成并不是 Block

到这里,我们要普及一下 HDFS 文件块的组成结构。

一个 Block 是有很多个 Packet(小包)组成的,而每个 Packet 又是有很多个 chunk(块)+checksum(校验码) 组成的。

其中每个 chunk 是 512 byte,校验码是 4 个字节。每个 Packet 是 64K,每个 Packet 是有 127 个 chunk 组成的。

六、继续写流程

上面那个 write 方法是一个字节一个字节往内存里写,每写一个字节都要判断是否缓存到了一定大小(9个chunk大小),如果到了一定大小,就开始 flushBuffer。

我们点到 flushBuffer 方法里来(当前位置:FSOutputSummer)

这里开始 for 循环 byte[] 数组,开始一个 chunk 一个 chunk 的开始写(当前位置:FSOutputSummer)

再往里面点就是抽象类了,我们看它的实现类(当前位置:DFSOutputStream)

我们进入到这个方法里来。先创建一个 Packet 对象(当前位置:DFSOutputStream)

然后往 Packet 对象里写入 chunk 和 checksum 的数据(当前位置:DFSOutputStream)

随着数据的不断写入,Packet 的数据会越来越多。直到一个 Packet 写满 或者 Block 写满,都表示一个 Packet 写满了,开始把 Packet 对象放入到队列中。(当前位置:DFSOutputStream)

继续点进去看。(当前位置:DFSOutputStream)

可以看到,往队列 dataQueue add 了一个 Packet 对象,并且调用了 notifyAll() 方法。

至于什么要放入队列中,为什么要调用 notifyAll() 方法?

还记得上面有一个 DataStreamer 线程吗,它的 run 方法里面,在 while 循环判断 DataQueue 是否有数据,如果 DataQueue 是空的,就在那 wait 。DataStreamer 线程其实就是在等待其他线程往 DataQueue 里面放数据,并且通知它。

直到现在为止,我们的流程是这样的:

七、队列里有数据之后 DataStreamer 进程开始工作

当 DataQueue 里面有数据之后,其他线程调用了 notifyAll() 方法,DataStreamer 线程从 wait 方法开始继续执行。执行时,会再次进入 while 循环中,此时 DataQueue.size() 已经不等于0了,所以跳出 while 循环。(当前位置:DFSOutputStream)

从 DataQueue 里面取一条数据出来(当前位置:DFSOutputStream)

然后开始做一个非常重要的事情,那就是先向 NameNode 申请一个 Block 块,然后建立数据管道。(当前位置:DFSOutputStream)

八、向 NameNode 请求 Block 信息

我们继续点进去 nextBlockOutputStream() 方法。(当前位置:DFSOutputStream)

然后开始调用 NameNode 的方法,向 NameNode 申请 Block 信息:

这个时候,需要去看服务端代码了,我们进入 NameNodeRpcServer ,找到 addBlock 方法。(当前位置:NameNodeRpcServer)

我们可以稍微看一眼它的返回值 LocatedBlock 都有哪些属性,发现它会返回 DataNodeInfo 的信息,也就是说,我们这个 Block 该存到哪些 DataNode 上面。

我们再点到 getAdditionalBlock 方法里去,发现会使用 BlockManager 来选择当前 Block 该存放到哪些 DataNode 上(当前位置:FSNamesystem)

当然这里,肯定不是无脑选择的,肯定会有一些策略,比如负载均衡,机架感知策略。

然后会把 Block 信息挂载到目录树上,然后再写到磁盘上。(当前位置:FSNamesystem)

好了,向 NameNode 申请 Block 大致就是这么多过程,此时我们的流程图是这样的。

九、建立数据管道

到目前位置,Block 信息也申请完毕,下面我们要开始创建数据管道。

那么创建数据管道是个什么样的动作?

所谓数据管道,就是在 DataNode 和 DataNode 预先启动好相关的线程和 Socket。然后客户端是往某一个节点上(DataNode1)去写数据的,然后 DataNode 1 开始往 DataNode2 上写,DataNode2 同时往 DataNode3 写。

那这样做有什么好处?

如果假设客户端就直接往三个节点上写,那么客户端会同时维护三个连接。如果客户端要写很多文件的话,就要维护好多连接,压力比较大。更有甚者,如果客户端和机房不在一个局域网下,那么带宽压力也会非常大。

所以使用数据管道,客户端只需要维护和某一个节点的网络连接就可以了,加速效率,节省带宽,可以说是非常好的设计。

下面继续看源码,上面已经说到,客户端已经向 NameNode 申请了 Block,并且知道要往哪些 DataNode 上写数据了,下面开始建立数据管道。

当前位置:(DataStreamer :nextBlockOutputStream() 方法)

然后建立了一个 Socket 连接,创建了输出流,使用这个输出流来往 DataNode 写数据

开始写 Socket 请求(当前位置:DataStreamer)

我们点进去看一下,注意这里的类型,是 WRITE_BLOCK 类型的,会对应到下文接收方对于不同类型的处理。最后 flush 了,即把数据写了出去(当前位置:Sender)

然后既然 Socket 请求发出去了,肯定 DataNode 有个程序在接收数据,这个类叫 DataXceiverServer,我们打开看一下,发现它是一个线程,那我们来看一下 run 方法(当前位置:DataXceiverServer)。

首先接收 Socket 请求(当前位置:DataXceiverServer):

每发送过来一个 Packet ,都要创建一个 DataXceiver 线程去接收它(当前位置:DataXceiverServer):

我们来看一下 DataXceiver 的 run 方法(因为它是一个线程)(当前位置:DataXceiver),首先会读取数据:

然后处理这些数据(当前位置:DataXceiver):

然后根据不同类型,来做不同的处理(当前位置:Receiver):

然后看一下实现,是一个方法参数定义好几十的方法(当前位置:Receiver):

它是一个抽象方法,我们来看它的实现类 DataXceiver,发现创建了一个 BlockReceiver,来接收数据(当前位置:DataXceiver):

这个方法执行完了之后,发现它会判断是否有下游的机器,如果有,则继续调用 Sender 的writeBlock 方法发送数据

那么这个方法又是刚才的方法了。

所以到现在为止,就是管道建立的过程,也就是把各个 DataNode 上的线程建立起来了,流程图如下所示:

建立数据管道是一个发送 Socket 请求的过程,既然是网络请求,那么肯定会遇到错误,是如何处理的呢?

如果建立过程中遇到错误,会返回 false。如果返回值是 false 的话,那么 namenode 会抛弃这个 block,并且会把错误机器的 DataNode 记录下来。由于这些方法是在 for 循环里面,下一次重新申请 Block 的话,就不会再去分配那台错误的 DataNode 了。

十、客户端收集各个 DataNode 写数据的结果

现在客户端已经和各个 DataNode 建立好了数据管道,开始写数据了。那么写数据的结果一样很重要,需要去收集各个 DataNode 写数据的结果。此时会创建一个重要的对象ResponseProcessor。(当前位置:DataStreamer)

可以看到,它是一个线程,会去读取下游的处理结果(当前位置:ResponseProcessor):

如果是成功的,就把这个 Packet 数据从 ack 中移除。

那么这里的 AckQueue 是什么呢?有什么作用?

我们先看启动了 ResponseProcessor 之后会做什么?会把 Packet 从 DataQueue 中移出去,然后加到 AckQueue 中来。(当前位置:DataStreamer):

最终,终于开始写数据(当前位置:DataStreamer):

下面来解释一下 ,DataQueue 和 AckQueue 是怎么配合使用的

就是客户端在写满一个 Packet 的时候,DataStreamer 线程从 DataQueue 的队列中取了一个出来,准备发到 DataNode,此时它会再写一份到 AckQueue 中,然后才开始建立数据管道,写数据到 DataNode。

等到 DataNode 都写完了,DataNode 之间会一个个汇报自己写数据的结果上来,最终汇报给客户端。如果是成功的,就把这个 packet 从 AckQueue 中移除掉,如果失败了,则重新把这个 Packet 从 AckQueue 中拿到 DataQueue 中重试一次。

十一、DataNode 如何接收数据的

我们继续从第九点,建立数据管道后面开始看,也就是 DataXceiver 开始看(当前位置:DataXceiver):

它创建了一个 PacketResponder,(当前位置:BlockReceiver):

这个对象也很好理解,既然客户端要接收 DataNode 上报的结果创建了 ResponseProcessor ,那么 DataNode 之间也要去知道其他 DataNode 有没写数据成功,也需要一个 PacketResponder 来接收响应。

不过只需要在 DataNode1 和 DataNode 2 上启动就行了,DataNode3 是最后一个节点,是不需要的。

然后看一下 ResponseProcessor 这个线程做了什么。

如果不是下游的最后一个节点,则读取下游返回的结果,(当前位置:BlockReceiver):

如果下游处理成功,则把 Packet 从 AckQueue 中移除。注意这里的 AckQueue 不是客户端的那个,而是 DataNode 里面的 AckQueue,(当前位置:BlockReceiver):

如果处理不成功,那么把 AckQueue 的东西重试一遍就可以了。

然后它就开始不断的接收下游的请求了,在一个 while 循环里面,(当前位置:BlockReceiver)

我们来看这个 receivePacket 方法,它接收到数据之后,做的第一件事情就是把 Packet 数据放到 自己的 ackQueue 里面去,(当前位置:BlockReceiver):

然后把数据写到下游(当前位置:BlockReceiver):

然后校验一下数据(当前位置:BlockReceiver):

没问题的话,就把数据写到本地磁盘上面(当前位置:BlockReceiver):

所以,总结一下,就是当 DataNode 接收到数据以后,第一步就是把这个 Packet 放到自己的 AckQueue 里面,然后把数据发送到下游节点,校验数据并写到自己的磁盘。

同时下游的 DataNode 也会依次做这样的事情。也就是说,客户端在写数据的时候,所有的 DataNode 都会同时往自己的磁盘上写数据,并不是一个个写的。

此时整体流程是这样的:

十二、HDFS 写数据的容错机制

既然是往多个节点写,那么肯定是会出错的,看看它的容错是怎样的:

这里面就是把相关标志标记了一下

然后抛出了异常,最外面是捕获了异常的,捕获到异常后,又把 hasError 标识为 true 了,开始进入下一次循环。

然后循环的时候会走到最后这行代码这:

可以点进去看一下,重新把数据加入到了客户端的 DataQueue 里,并且清空了 AckQueue。

并且开始重新建立数据管道,此时会分成两种情况:

一种情况是,3 副本的情况下,只有一个 DataNode 挂掉了,那么就不再找一台机器再建立管道了,而是继续把数据写完。那么写完肯定就俩副本,会等到 DataNode 向 NameNode 发送心跳时,NameNode 会给 DataNode 一个指令,让它把自己的副本,拷贝到一个别的机器上。

另一种情况是,如果超过了半数都挂了,那么就只能推倒重来,重新建立数据管道了。

十三、总结一下

这个流程还是很复杂的,我们可以稍微理一下脉络

1、首先向 NameNode 建立文件,创建契约;

2、启动了 DataStreamer 线程;

3、写 Packet

4、向 NameNode 申请 Block;

5、建立数据管道以及容错;

6、ResponseProcessor 线程;

7、PacketResponder 线程;

8、写数据的容错。

其实写文章的好处,就是把自己研究过的东西记录下来,因为人的大脑真的很不靠谱,动不动就忘记了。有了笔记,日后可以拿来回忆,就不用重新看了,可以很容易就回忆起来。

好了,谢谢!

相关文章
|
4月前
|
存储 安全
HDFS读写流程详解
HDFS读写流程详解
HDFS读写流程详解
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
57 4
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
53 2
|
2月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
43 2
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
43 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
56 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
106 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
51 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
59 0
|
4月前
|
SQL 存储 分布式计算
HDFS数据(跨集群)迁移
HDFS数据(跨集群)迁移