在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示:
01 |
package org.shirdrn.hadoop.hdfs; |
03 |
import java.io.BufferedReader; |
04 |
import java.io.IOException; |
05 |
import java.io.InputStreamReader; |
07 |
import org.apache.hadoop.conf.Configuration; |
08 |
import org.apache.hadoop.fs.FSDataInputStream; |
09 |
import org.apache.hadoop.fs.FileSystem; |
10 |
import org.apache.hadoop.fs.Path; |
12 |
public class HdfsFileReader { |
14 |
public static void main(String[] args) { |
16 |
Path path = new Path(file); |
18 |
Configuration conf = new Configuration(); |
21 |
BufferedReader reader = null ; |
23 |
fs = FileSystem.get(conf); |
25 |
reader = new BufferedReader( new InputStreamReader(in)); |
27 |
while ((line = reader.readLine()) != null ) { |
28 |
System.out.println( "Record: " + line); |
30 |
} catch (IOException e) { |
34 |
if (reader != null ) reader.close(); |
35 |
} catch (IOException e) { |
基于上面代码,我们可以看到,通过一个FileSystem对象可以打开一个Path文件,返回一个FSDataInputStream文件输入流对象,然后从该FSDataInputStream对象就能够读取出文件的内容。所以,我们从FSDataInputStream入手,详细分析从HDFS读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的Block列表元数据信息,整体流程如下图所示:
下面,详细说明整个流程:
创建FSDataInputStream流对象
从一个Path路径对象,能够获取到一个FileSystem对象,然后通过调用FileSystem的open方法打开一个文件流:
1 |
public FSDataInputStream open(Path f) throws IOException { |
2 |
return open(f, getConf().getInt( "io.file.buffer.size" , 4096 )); |
由于FileSystem是抽象类,将具体的打开操作留给具体子类实现,例如FTPFileSystem、HarFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,open方法实现,代码如下所示:
1 |
public FSDataInputStream open(Path f, int bufferSize) throws IOException { |
2 |
statistics.incrementReadOps( 1 ); |
3 |
return new DFSClient.DFSDataInputStream( |
4 |
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics)); |
statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个DFSClient.DFSDataInputStream对象,该对象的参数是通过DFSClient dfs客户端对象打开一个这个文件从而返回一个DFSInputStream对象,下面,我们看DFSClient的open方法实现,代码如下所示:
1 |
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, |
2 |
FileSystem.Statistics stats) throws IOException { |
5 |
return new DFSInputStream(src, buffersize, verifyChecksum); |
checkOpen方法就是检查一个标志位clientRunning,表示当前的dfs客户端对象是否已经创建并初始化,在dfs客户端创建的时候该标志就为true,表示客户端正在运行状态。我们知道,当客户端DFSClient连接到Namenode的时候,实际上是创建了一个到Namenode的RPC连接,Namenode作为Server角色,DFSClient作为Client角色,它们之间建立起Socket连接。只有显式调用DFSClient的close方法时,才会修改clientRunning的值为false,实际上真正地关闭了已经建立的RPC连接。
我们看一下创建DFSInputStream的构造方法实现:
1 |
DFSInputStream(String src, int buffersize, boolean verifyChecksum) throws IOException { |
2 |
this .verifyChecksum = verifyChecksum; |
3 |
this .buffersize = buffersize; |
5 |
prefetchSize = conf.getLong( "dfs.read.prefetch.size" , prefetchSize); |
先设置了几个与读取文件相关的参数值,这里有一个预先读取文件的Block字节数的参数prefetchSize,它的值设置如下:
1 |
public static final long DEFAULT_BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; |
2 |
public static final long DFS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024 ; |
4 |
defaultBlockSize = conf.getLong( "dfs.block.size" , DEFAULT_BLOCK_SIZE); |
5 |
private long prefetchSize = 10 * defaultBlockSize; |
这个prefetchSize的值默认为10*64*1024*1024=671088640,也就是说,默认预读取一个文件的10个块,即671088640B=640M,如果想要修改这个值,设置dfs.block.size即可覆盖默认值。
然后调用了openInfo方法,从Namenode获取到该打开文件的信息,在openInfo方法中,具体实现如下所示:
01 |
synchronized void openInfo() throws IOException { |
02 |
for ( int retries = 3 ; retries > 0 ; retries--) { |
03 |
if (fetchLocatedBlocks()) { |
10 |
DFSClient.LOG.warn( "Last block locations unavailable. " |
11 |
+ "Datanodes might not have reported blocks completely." |
12 |
+ " Will retry for " + retries + " times" ); |
16 |
throw new IOException( "Could not obtain the last block locations." ); |
上述代码中,有一个for循环用来获取Block列表。如果成功获取到待读取文件的Block列表,则直接返回,否则,最多执行3次等待重试操作(最多花费时间大于12秒)。未能成功读取文件的Block列表信息,是因为Namenode无法获取到文件对应的块列表的信息,当整个集群启动的时候,Datanode会主动向NNamenode上报对应的Block信息,只有Block Report完成之后,Namenode就能够知道组成文件的Block及其所在Datanode列表的信息。openInfo方法方法中调用了fetchLocatedBlocks方法,用来与Namenode进行RPC通信调用,实际获取对应的Block列表,实现代码如下所示:
01 |
private boolean fetchLocatedBlocks() throws IOException, |
02 |
FileNotFoundException { |
03 |
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0 , prefetchSize); |
04 |
if (newInfo == null ) { |
05 |
throw new FileNotFoundException( "File does not exist: " + src); |
08 |
if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) { |
09 |
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); |
10 |
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); |
11 |
while (oldIter.hasNext() && newIter.hasNext()) { |
12 |
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { |
13 |
throw new IOException( "Blocklist for " + src + " has changed!" ); |
17 |
boolean isBlkInfoUpdated = updateBlockInfo(newInfo); |
18 |
this .locatedBlocks = newInfo; |
19 |
this .currentNode = null ; |
20 |
return isBlkInfoUpdated; |
调用callGetBlockLocations方法,实际上是根据创建RPC连接以后得到的Namenode的代理对象,调用Namenode来获取到指定文件的Block的位置信息(位于哪些Datanode节点上):namenode.getBlockLocations(src, start, length)。调用callGetBlockLocations方法返回一个LocatedBlocks对象,该对象包含了文件长度信息、List blocks列表对象,其中LocatedBlock包含了一个Block的基本信息:
3 |
private DatanodeInfo[] locs; |
4 |
private boolean corrupt; |
有了这些文件的信息(文件长度、文件包含的Block的位置等信息),DFSClient就能够执行后续读取文件数据的操作了,详细过程我们在后面分析说明。
通过Namenode获取文件信息
上面,我们提到获取一个文件的基本信息,是通过Namenode来得到的,这里详细分析Namenode是如何获取到这些文件信息的,实现方法getBlockLocations的代码,如下所示:
1 |
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { |
2 |
myMetrics.incrNumGetBlockLocations(); |
3 |
return namesystem.getBlockLocations(getClientMachine(), src, offset, length); |
可以看到,Namenode又委托管理HDFS name元数据的FSNamesystem的getBlockLocations方法实现:
01 |
LocatedBlocks getBlockLocations(String clientMachine, String src, long offset, long length) throws IOException { |
02 |
LocatedBlocks blocks = getBlockLocations(src, offset, length, true , true , true ); |
09 |
Node client = host2DataNodeMap.getDatanodeByHost(clientMachine); |
11 |
List<String> hosts = new ArrayList<String> ( 1 ); |
12 |
hosts.add(clientMachine); |
13 |
String rName = dnsToSwitchMapping.resolve(hosts).get( 0 ); |
15 |
client = new NodeBase(clientMachine, rName); |
18 |
DFSUtil.StaleComparator comparator = null ; |
19 |
if (avoidStaleDataNodesForRead) { |
20 |
comparator = new DFSUtil.StaleComparator(staleInterval); |
23 |
for (LocatedBlock b : blocks.getLocatedBlocks()) { |
24 |
clusterMap.pseudoSortByDistance(client, b.getLocations()); |
25 |
if (avoidStaleDataNodesForRead) { |
26 |
Arrays.sort(b.getLocations(), comparator); |
跟踪代码,最终会在下面的方法中实现了,如何获取到待读取文件的Block的元数据列表,以及如何取出该文件的各个Block的数据,方法实现代码,这里我做了详细的注释,可以参考,如下所示:
01 |
private synchronized LocatedBlocks getBlockLocationsInternal(String src, |
06 |
boolean needBlockToken) |
08 |
INodeFile inode = dir.getFileINode(src); |
12 |
if (doAccessTime && isAccessTimeSupported()) { |
13 |
dir.setTimes(src, inode, - 1 , now(), false ); |
15 |
Block[] blocks = inode.getBlocks(); |
19 |
if (blocks.length == 0 ) { |
20 |
return inode.createLocatedBlocks( new ArrayList<LocatedBlock>(blocks.length)); |
22 |
List<LocatedBlock> results; |
23 |
results = new ArrayList<LocatedBlock>(blocks.length); |
26 |
long curPos = 0 , blkSize = 0 ; |
27 |
int nrBlocks = (blocks[ 0 ].getNumBytes() == 0 ) ? 0 : blocks.length; |
28 |
for (curBlk = 0 ; curBlk < nrBlocks; curBlk++) { |
29 |
blkSize = blocks[curBlk].getNumBytes(); |
30 |
assert blkSize > 0 : "Block of size 0" ; |
31 |
if (curPos + blkSize > offset) { |
37 |
if (nrBlocks > 0 && curBlk == nrBlocks) |
40 |
long endOff = offset + length; |
44 |
int numNodes = blocksMap.numNodes(blocks[curBlk]); |
45 |
int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas(); |
46 |
int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); |
47 |
if (numCorruptNodes != numCorruptReplicas) { |
48 |
LOG.warn( "Inconsistent number of corrupt replicas for " |
49 |
+ blocks[curBlk] + "blockMap has " + numCorruptNodes |
50 |
+ " but corrupt replicas map has " + numCorruptReplicas); |
52 |
DatanodeDescriptor[] machineSet = null ; |
53 |
boolean blockCorrupt = false ; |
54 |
if (inode.isUnderConstruction() && curBlk == blocks.length - 1 |
55 |
&& blocksMap.numNodes(blocks[curBlk]) == 0 ) { |
57 |
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) inode; |
58 |
machineSet = cons.getTargets(); |
61 |
blockCorrupt = (numCorruptNodes == numNodes); |
62 |
int numMachineSet = blockCorrupt ? numNodes : (numNodes - numCorruptNodes); |
63 |
machineSet = new DatanodeDescriptor[numMachineSet]; |
64 |
if (numMachineSet > 0 ) { |
66 |
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) { |
67 |
DatanodeDescriptor dn = it.next(); |
68 |
boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn); |
69 |
if (blockCorrupt || (!blockCorrupt && !replicaCorrupt)) |
70 |
machineSet[numNodes++] = dn; |
74 |
LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos, blockCorrupt); |
75 |
if (isAccessTokenEnabled && needBlockToken) { |
76 |
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ))); |
80 |
curPos += blocks[curBlk].getNumBytes(); |
82 |
} while (curPos < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn); |
84 |
return inode.createLocatedBlocks(results); |
我们可以看一下,最后的调用inode.createLocatedBlocks(results)生成LocatedBlocks对象的实现,代码如下所示:
1 |
LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) { |
2 |
return new LocatedBlocks(computeContentSummary().getLength(), blocks, isUnderConstruction()); |
客户端通过RPC调用,获取到了文件对应的Block以及所在Datanode列表的信息,然后就可以根据LocatedBlocks来进一步获取到对应的Block对应的物理数据块。
对Block列表进行排序
我们再回到FSNamesystem类,调用getBlockLocationsInternal方法的getBlockLocations方法中,在返回文件block列表LocatedBlocks之后,会对每一个Block所在的Datanode进行的一个排序,排序的基本规则有如下2点:
- Client到Block所在的Datanode的距离最近,这个是通过网络拓扑关系来进行计算,例如Client的网络路径为/dc1/r1/c1,那么路径为/dc1/r1/dn1的Datanode就比路径为/dc1/r2/dn2的距离小,/dc1/r1/dn1对应的Block就会排在前面
- 从上面一点可以推出,如果Client就是某个Datanode,恰好某个Block的Datanode列表中包括该Datanode,则该Datanode对应的Block排在前面
- Block所在的Datanode列表中,如果其中某个Datanode在指定的时间内没有向Namenode发送heartbeat(默认由常量DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT定义,默认值为30s),则该Datanode的状态即为STALE,具有该状态的Datanode对应的Block排在后面
基于上述规则排序后,Block列表返回到Client。
Client与Datanode交互更新文件Block列表
我们要回到前面分析的DFSClient.DFSInputStream.fetchLocatedBlocks()方法中,查看在调用该方法之后,是如何执行实际处理逻辑的:
01 |
private boolean fetchLocatedBlocks() throws IOException, |
02 |
FileNotFoundException { |
03 |
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0 , prefetchSize); |
04 |
if (newInfo == null ) { |
05 |
throw new FileNotFoundException( "File does not exist: " + src); |
08 |
if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) { |
09 |
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); |
10 |
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); |
11 |
while (oldIter.hasNext() && newIter.hasNext()) { |
12 |
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { |
13 |
throw new IOException( "Blocklist for " + src + " has changed!" ); |
17 |
boolean isBlkInfoUpdated = updateBlockInfo(newInfo); |
18 |
this .locatedBlocks = newInfo; |
19 |
this .currentNode = null ; |
20 |
return isBlkInfoUpdated; |
如果第一次读取该文件时,已经获取到了对应的block列表,缓存在客户端;如果客户端第二次又读取了该文件,仍然获取到一个block列表对象。在两次读取之间,可能存在原文件完全被重写的情况,所以新得到的block列表与原列表完全不同了,存在这种情况,客户端直接抛出IO异常,如果原文件对应的block列表没有变化,则更新客户端缓存的对应block列表信息。
当集群重启的时候(如果允许安全模式下读文件),或者当一个文件正在创建的时候,Datanode向Namenode进行Block Report,这个过程中可能Namenode还没有完全重建好Block到Datanode的映射关系信息,所以即使在这种情况下,仍然会返回对应的正在创建的Block所在的Datanode列表信息,可以从前面getBlockLocationsInternal方法中看到,INode的对应UnderConstruction状态为true。这时,一个Block对应的所有副本中的某些可能还在创建过程中。
上面方法中,调用updateBlockInfo来更新文件的Block元数据列表信息,对于文件的某些Block可能没有创建完成,所以Namenode所保存的关于文件的Block的的元数据信息可能没有及时更新(Datanode可能还没有完成Block的报告),代码实现如下所示:
01 |
private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException { |
02 |
if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction() || !(newInfo.locatedBlockCount() > 0 )) { |
06 |
LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1 ); |
07 |
boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo.getFileLength()); |
08 |
if (!lastBlockInFile) { |
12 |
if (last.getLocations().length == 0 ) { |
16 |
ClientDatanodeProtocol primary = null ; |
17 |
Block newBlock = null ; |
18 |
for ( int i = 0 ; i < last.getLocations().length && newBlock == null ; i++) { |
19 |
DatanodeInfo datanode = last.getLocations()[i]; |
21 |
primary = createClientDatanodeProtocolProxy(datanode, conf, last .getBlock(), last.getBlockToken(), socketTimeout, connectToDnViaHostname); |
22 |
newBlock = primary.getBlockInfo(last.getBlock()); |
23 |
} catch (IOException e) { |
24 |
if (e.getMessage().startsWith( |
25 |
"java.io.IOException: java.lang.NoSuchMethodException: " |
26 |
+ "org.apache.hadoop.hdfs.protocol" |
27 |
+ ".ClientDatanodeProtocol.getBlockInfo" )) { |
29 |
serverSupportsHdfs200 = false ; |
31 |
LOG.info( "Failed to get block info from " |
32 |
+ datanode.getHostName() + " probably does not have " |
33 |
+ last.getBlock(), e); |
36 |
if (primary != null ) { |
37 |
RPC.stopProxy(primary); |
42 |
if (newBlock == null ) { |
43 |
if (!serverSupportsHdfs200) { |
46 |
throw new IOException( "Failed to get block info from any of the DN in pipeline: " + Arrays.toString(last.getLocations())); |
49 |
long newBlockSize = newBlock.getNumBytes(); |
50 |
long delta = newBlockSize - last.getBlockSize(); |
52 |
last.getBlock().setNumBytes(newBlockSize); |
53 |
long newlength = newInfo.getFileLength() + delta; |
54 |
newInfo.setFileLength(newlength); |
55 |
LOG.debug( "DFSClient setting last block " + last + " to length " + newBlockSize + " filesize is now " + newInfo.getFileLength()); |
我们看一下,在updateBlockInfo方法中,返回false的情况:Client向Namenode发起的RPC请求,已经获取到了组成该文件的数据块的元数据信息列表,但是,文件的最后一个数据块的存储位置信息无法获取到,说明Datanode还没有及时通过block report将数据块的存储位置信息报告给Namenode。通过在openInfo()方法中可以看到,获取文件的block列表信息有3次重试机会,也就是调用updateBlockInfo方法返回false,可以有12秒的时间,等待Datanode向Namenode汇报文件的最后一个块的位置信息,以及Namenode更新内存中保存的文件对应的数据块列表元数据信息。
我们再看一下,在updateBlockInfo方法中,返回true的情况:
- 文件已经创建完成,文件对应的block列表元数据信息可用
- 文件正在创建中,但是当前能够读取到的已经完成的最后一个块(非组成文件的最后一个block)的元数据信息可用
- 文件正在创建中,文件的最后一个block的元数据部分可读:从Namenode无法获取到该block对应的位置信息,这时Client会与Datanode直接进行RPC通信,获取到该文件最后一个block的位置信息
上面Client会与Datanode直接进行RPC通信,获取文件最后一个block的元数据,这时可能由于网络问题等等,无法得到文件最后一个block的元数据,所以也会返回true,也就是说,Client仍然可以读取该文件,只是无法读取到最后一个block的数据。
这样,在Client从Namenode/Datanode获取到的文件的Block列表元数据已经是可用的信息,可以根据这些信息读取到各个Block的物理数据块内容了,准确地说,应该是文件处于打开状态了,已经准备好后续进行的读操作了。