一、前言
想必每次去面试都复习这样一道题:HDFS 的读写流程,自然是先百度一番,复制一下答案,1 2 3 4 5 6 点,背一背完事。面试完,还是不了解 HDFS 究竟是怎么设计这个写数据流程的。
其实这个里面也有很多我们值得学习的东西,比如既然写数据到 DataNode,如何保障数据一致性,如何保障数据在写的时候不丢失,重试如何做的,如何做三备份的?
那么这次咱就趴一趴 HDFS 的写数据流程吧。
二、往 HDFS 写数据的客户端代码
我们用 HDFS 的 api ,从一个写数据的代码开始剖析这个过程:
public class TestWriteHdfsFile { public static void main(String[] args) throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(configuration); FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/user/data.txt")); fsDataOutputStream.write("contents".getBytes(StandardCharsets.UTF_8)); } }
从这段源码可以看到,写一个数据要有两步,第一步是要 create 一个文件,第二步才是往这个文件写对象。
三、写数据的 create 方法
先获得一个 FileSystem 对象,创建了一个文件返回了 FSDataOutputStream 对象,然后用这个对象写字节数组。
自然得从 create 方法看起了,点进去(当前类:FileSystem)
再点进去(当前类:FileSystem)
再点进去(当前类:FileSystem)
再点进去(当前类:FileSystem)
再点进去,发现是一个抽象方法了(当前类:FileSystem)
ctrl + alt + B,找到实现类,并进入 DistributedFileSystem 类中(当前类:DistributedFileSystem )
再点进去(当前类:DistributedFileSystem )
在 dfs 的 create 方法中,做了三件事情:
- 往 NameNode 文件目录树的合理位置中添加了 INodeFile 节点;
- 对这个要往里面写数据的文件,添加了【契约】管理;
- 启动了 DataStreamer,这是写数据的关键服务。
我们再点进去(当前类:DFSClient)
点进去(当前类:DFSClient)
我们来看 DFSOutputStream 方法的 newStreamForCreate 方法,点进去(当前类:DFSClient)
可以看到,这里即将要调用 NameNode 的代理去执行 create 方法。create 方法再点进去已经没有实现类了,因为这已经是在用 Hadoop Rpc 调用 NameNode 的代理方法了。
具体的实现在 NameNodeRpcServer 里面,NameNode 这一侧大概做了三件事:
(1)创建了一个 INodeFile 加入到内存的文件目录管理树里面;
(2)为当前文件创建契约。所谓契约就是,拥有了这个文件契约的客户端才有权限去写这个文件,防止多个客户端同时写一个文件。
(3)把元数据信息存入 EditLog 日志里面;
(这里面也有很多的逻辑,我们用一个新篇章来讲这些事情)
到目前位置,当前的流程图为:
在 NameNode 这一侧完成了这些事情之后,下面创建了一个很重要的对象:DFSOutputStream,这是写数据流程里面最重要的一个对象。
当前类(DFSOutputStream)
点进去 start 方法看一下,发现是调用的内部的 DataStreamer 类的 start 方法,DataStreamer 类是一个线程。
可以看一下(当前位置:DFSOutputStream 的 DataStreamer 类)
它既然是一个线程,那我们看下这个类的 run 方法是在做什么:
可以看到这是一个 while 循环,里面用 synchronize 锁住了 DataQueue 对象,如果这个队列是空的话,就在 wait 。(当前位置:DFSOutputStream 的 DataStreamer 类)
此时我们的流程图是这样子的:
到目前为止,就是 create 方法,做了这么多的事情。记住,当前还是在 客户端这边的。
四、开始执行 write 方法
我们直接点进去 write() 方法(当前位置:TestWriteFile 客户端测试代码)
发现点进去竟然是在 jdk 的代码里面,这不太对。
我们可以再次从 create 方法点到 这个地方来(当前位置:DistributedFileSystem)
点进去,发现其实它其实真正返回的类是 HdfsDataOutputStream,并且在构造函数里面传了一个 out 对象进来,这个 out 对象就是 DFSOutputStream。(当前位置:DFSClient)
那我们看一下 HdfsDataOutputStream 这个类的 write 方法。搜索了一下,发现没有。那可能在父类里,我们去父类里看一下:
发现父类里有 write() 方法(当前位置:FSDataOutputStream)
调用的是 out 的 write 方法,上面可以看到,其实 out 是 DFSOutputStream。那我们看 DFSOutputStream 的 write 方法,发现又没有,那可能是在父类里面:
发现父类有 write 方法(当前位置:FSOutputSummer)
再进去就是核心写数据的方法了。
五、HDFS 文件最小的组成并不是 Block
到这里,我们要普及一下 HDFS 文件块的组成结构。
一个 Block 是有很多个 Packet(小包)组成的,而每个 Packet 又是有很多个 chunk(块)+checksum(校验码) 组成的。
其中每个 chunk 是 512 byte,校验码是 4 个字节。每个 Packet 是 64K,每个 Packet 是有 127 个 chunk 组成的。
六、继续写流程
上面那个 write 方法是一个字节一个字节往内存里写,每写一个字节都要判断是否缓存到了一定大小(9个chunk大小),如果到了一定大小,就开始 flushBuffer。
我们点到 flushBuffer 方法里来(当前位置:FSOutputSummer)
这里开始 for 循环 byte[] 数组,开始一个 chunk 一个 chunk 的开始写(当前位置:FSOutputSummer)
再往里面点就是抽象类了,我们看它的实现类(当前位置:DFSOutputStream)
我们进入到这个方法里来。先创建一个 Packet 对象(当前位置:DFSOutputStream)
然后往 Packet 对象里写入 chunk 和 checksum 的数据(当前位置:DFSOutputStream)
随着数据的不断写入,Packet 的数据会越来越多。直到一个 Packet 写满 或者 Block 写满,都表示一个 Packet 写满了,开始把 Packet 对象放入到队列中。(当前位置:DFSOutputStream)
继续点进去看。(当前位置:DFSOutputStream)
可以看到,往队列 dataQueue add 了一个 Packet 对象,并且调用了 notifyAll() 方法。
至于什么要放入队列中,为什么要调用 notifyAll() 方法?
还记得上面有一个 DataStreamer 线程吗,它的 run 方法里面,在 while 循环判断 DataQueue 是否有数据,如果 DataQueue 是空的,就在那 wait 。DataStreamer 线程其实就是在等待其他线程往 DataQueue 里面放数据,并且通知它。
直到现在为止,我们的流程是这样的:
七、队列里有数据之后 DataStreamer 进程开始工作
当 DataQueue 里面有数据之后,其他线程调用了 notifyAll() 方法,DataStreamer 线程从 wait 方法开始继续执行。执行时,会再次进入 while 循环中,此时 DataQueue.size() 已经不等于0了,所以跳出 while 循环。(当前位置:DFSOutputStream)
从 DataQueue 里面取一条数据出来(当前位置:DFSOutputStream)
然后开始做一个非常重要的事情,那就是先向 NameNode 申请一个 Block 块,然后建立数据管道。(当前位置:DFSOutputStream)
八、向 NameNode 请求 Block 信息
我们继续点进去 nextBlockOutputStream() 方法。(当前位置:DFSOutputStream)
然后开始调用 NameNode 的方法,向 NameNode 申请 Block 信息:
这个时候,需要去看服务端代码了,我们进入 NameNodeRpcServer ,找到 addBlock 方法。(当前位置:NameNodeRpcServer)
我们可以稍微看一眼它的返回值 LocatedBlock 都有哪些属性,发现它会返回 DataNodeInfo 的信息,也就是说,我们这个 Block 该存到哪些 DataNode 上面。
我们再点到 getAdditionalBlock 方法里去,发现会使用 BlockManager 来选择当前 Block 该存放到哪些 DataNode 上(当前位置:FSNamesystem)
当然这里,肯定不是无脑选择的,肯定会有一些策略,比如负载均衡,机架感知策略。
然后会把 Block 信息挂载到目录树上,然后再写到磁盘上。(当前位置:FSNamesystem)
好了,向 NameNode 申请 Block 大致就是这么多过程,此时我们的流程图是这样的。
九、建立数据管道
到目前位置,Block 信息也申请完毕,下面我们要开始创建数据管道。
那么创建数据管道是个什么样的动作?
所谓数据管道,就是在 DataNode 和 DataNode 预先启动好相关的线程和 Socket。然后客户端是往某一个节点上(DataNode1)去写数据的,然后 DataNode 1 开始往 DataNode2 上写,DataNode2 同时往 DataNode3 写。
那这样做有什么好处?
如果假设客户端就直接往三个节点上写,那么客户端会同时维护三个连接。如果客户端要写很多文件的话,就要维护好多连接,压力比较大。更有甚者,如果客户端和机房不在一个局域网下,那么带宽压力也会非常大。
所以使用数据管道,客户端只需要维护和某一个节点的网络连接就可以了,加速效率,节省带宽,可以说是非常好的设计。
下面继续看源码,上面已经说到,客户端已经向 NameNode 申请了 Block,并且知道要往哪些 DataNode 上写数据了,下面开始建立数据管道。
当前位置:(DataStreamer :nextBlockOutputStream() 方法)
然后建立了一个 Socket 连接,创建了输出流,使用这个输出流来往 DataNode 写数据
开始写 Socket 请求(当前位置:DataStreamer)
我们点进去看一下,注意这里的类型,是 WRITE_BLOCK 类型的,会对应到下文接收方对于不同类型的处理。最后 flush 了,即把数据写了出去(当前位置:Sender)
然后既然 Socket 请求发出去了,肯定 DataNode 有个程序在接收数据,这个类叫 DataXceiverServer,我们打开看一下,发现它是一个线程,那我们来看一下 run 方法(当前位置:DataXceiverServer)。
首先接收 Socket 请求(当前位置:DataXceiverServer):
每发送过来一个 Packet ,都要创建一个 DataXceiver 线程去接收它(当前位置:DataXceiverServer):
我们来看一下 DataXceiver 的 run 方法(因为它是一个线程)(当前位置:DataXceiver),首先会读取数据:
然后处理这些数据(当前位置:DataXceiver):
然后根据不同类型,来做不同的处理(当前位置:Receiver):
然后看一下实现,是一个方法参数定义好几十的方法(当前位置:Receiver):
它是一个抽象方法,我们来看它的实现类 DataXceiver,发现创建了一个 BlockReceiver,来接收数据(当前位置:DataXceiver):
这个方法执行完了之后,发现它会判断是否有下游的机器,如果有,则继续调用 Sender 的writeBlock 方法发送数据
那么这个方法又是刚才的方法了。
所以到现在为止,就是管道建立的过程,也就是把各个 DataNode 上的线程建立起来了,流程图如下所示:
建立数据管道是一个发送 Socket 请求的过程,既然是网络请求,那么肯定会遇到错误,是如何处理的呢?
如果建立过程中遇到错误,会返回 false。如果返回值是 false 的话,那么 namenode 会抛弃这个 block,并且会把错误机器的 DataNode 记录下来。由于这些方法是在 for 循环里面,下一次重新申请 Block 的话,就不会再去分配那台错误的 DataNode 了。
十、客户端收集各个 DataNode 写数据的结果
现在客户端已经和各个 DataNode 建立好了数据管道,开始写数据了。那么写数据的结果一样很重要,需要去收集各个 DataNode 写数据的结果。此时会创建一个重要的对象ResponseProcessor。(当前位置:DataStreamer)
可以看到,它是一个线程,会去读取下游的处理结果(当前位置:ResponseProcessor):
如果是成功的,就把这个 Packet 数据从 ack 中移除。
那么这里的 AckQueue 是什么呢?有什么作用?
我们先看启动了 ResponseProcessor 之后会做什么?会把 Packet 从 DataQueue 中移出去,然后加到 AckQueue 中来。(当前位置:DataStreamer):
最终,终于开始写数据(当前位置:DataStreamer):
下面来解释一下 ,DataQueue 和 AckQueue 是怎么配合使用的
就是客户端在写满一个 Packet 的时候,DataStreamer 线程从 DataQueue 的队列中取了一个出来,准备发到 DataNode,此时它会再写一份到 AckQueue 中,然后才开始建立数据管道,写数据到 DataNode。
等到 DataNode 都写完了,DataNode 之间会一个个汇报自己写数据的结果上来,最终汇报给客户端。如果是成功的,就把这个 packet 从 AckQueue 中移除掉,如果失败了,则重新把这个 Packet 从 AckQueue 中拿到 DataQueue 中重试一次。
十一、DataNode 如何接收数据的
我们继续从第九点,建立数据管道后面开始看,也就是 DataXceiver 开始看(当前位置:DataXceiver):
它创建了一个 PacketResponder,(当前位置:BlockReceiver):
这个对象也很好理解,既然客户端要接收 DataNode 上报的结果创建了 ResponseProcessor ,那么 DataNode 之间也要去知道其他 DataNode 有没写数据成功,也需要一个 PacketResponder 来接收响应。
不过只需要在 DataNode1 和 DataNode 2 上启动就行了,DataNode3 是最后一个节点,是不需要的。
然后看一下 ResponseProcessor 这个线程做了什么。
如果不是下游的最后一个节点,则读取下游返回的结果,(当前位置:BlockReceiver):
如果下游处理成功,则把 Packet 从 AckQueue 中移除。注意这里的 AckQueue 不是客户端的那个,而是 DataNode 里面的 AckQueue,(当前位置:BlockReceiver):
如果处理不成功,那么把 AckQueue 的东西重试一遍就可以了。
然后它就开始不断的接收下游的请求了,在一个 while 循环里面,(当前位置:BlockReceiver)
我们来看这个 receivePacket 方法,它接收到数据之后,做的第一件事情就是把 Packet 数据放到 自己的 ackQueue 里面去,(当前位置:BlockReceiver):
然后把数据写到下游(当前位置:BlockReceiver):
然后校验一下数据(当前位置:BlockReceiver):
没问题的话,就把数据写到本地磁盘上面(当前位置:BlockReceiver):
所以,总结一下,就是当 DataNode 接收到数据以后,第一步就是把这个 Packet 放到自己的 AckQueue 里面,然后把数据发送到下游节点,校验数据并写到自己的磁盘。
同时下游的 DataNode 也会依次做这样的事情。也就是说,客户端在写数据的时候,所有的 DataNode 都会同时往自己的磁盘上写数据,并不是一个个写的。
此时整体流程是这样的:
十二、HDFS 写数据的容错机制
既然是往多个节点写,那么肯定是会出错的,看看它的容错是怎样的:
这里面就是把相关标志标记了一下
然后抛出了异常,最外面是捕获了异常的,捕获到异常后,又把 hasError 标识为 true 了,开始进入下一次循环。
然后循环的时候会走到最后这行代码这:
可以点进去看一下,重新把数据加入到了客户端的 DataQueue 里,并且清空了 AckQueue。
并且开始重新建立数据管道,此时会分成两种情况:
一种情况是,3 副本的情况下,只有一个 DataNode 挂掉了,那么就不再找一台机器再建立管道了,而是继续把数据写完。那么写完肯定就俩副本,会等到 DataNode 向 NameNode 发送心跳时,NameNode 会给 DataNode 一个指令,让它把自己的副本,拷贝到一个别的机器上。
另一种情况是,如果超过了半数都挂了,那么就只能推倒重来,重新建立数据管道了。
十三、总结一下
这个流程还是很复杂的,我们可以稍微理一下脉络
1、首先向 NameNode 建立文件,创建契约;
2、启动了 DataStreamer 线程;
3、写 Packet
4、向 NameNode 申请 Block;
5、建立数据管道以及容错;
6、ResponseProcessor 线程;
7、PacketResponder 线程;
8、写数据的容错。
其实写文章的好处,就是把自己研究过的东西记录下来,因为人的大脑真的很不靠谱,动不动就忘记了。有了笔记,日后可以拿来回忆,就不用重新看了,可以很容易就回忆起来。
好了,谢谢!