HDFS读文件过程分析:获取文件对应的Block列表

简介:

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据。对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的。我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示:

01 package org.shirdrn.hadoop.hdfs;
02
03 import java.io.BufferedReader;
04 import java.io.IOException;
05 import java.io.InputStreamReader;
06
07 import org.apache.hadoop.conf.Configuration;
08 import org.apache.hadoop.fs.FSDataInputStream;
09 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11
12 public class HdfsFileReader {
13
14 public static void main(String[] args) {
15 String file = "hdfs://hadoop-cluster-m:8020/data/logs/basis_user_behavior/201405071237_10_10_1_73.log";
16 Path path = new Path(file);
17
18 Configuration conf = new Configuration();
19 FileSystem fs;
20 FSDataInputStream in;
21 BufferedReader reader = null;
22 try {
23 fs = FileSystem.get(conf);
24 in = fs.open(path); // 打开文件path,返回一个FSDataInputStream流对象
25 reader = new BufferedReader(new InputStreamReader(in));
26 String line = null;
27 while((line = reader.readLine()) != null) { // 读取文件行内容
28 System.out.println("Record: " + line);
29 }
30 } catch (IOException e) {
31 e.printStackTrace();
32 } finally {
33 try {
34 if(reader != null) reader.close();
35 } catch (IOException e) {
36 e.printStackTrace();
37 }
38 }
39 }
40
41 }

基于上面代码,我们可以看到,通过一个FileSystem对象可以打开一个Path文件,返回一个FSDataInputStream文件输入流对象,然后从该FSDataInputStream对象就能够读取出文件的内容。所以,我们从FSDataInputStream入手,详细分析从HDFS读取文件内容的过程,在实际地读取物理数据块之前,首先要获取到文件对应的Block列表元数据信息,整体流程如下图所示:

下面,详细说明整个流程:

创建FSDataInputStream流对象

从一个Path路径对象,能够获取到一个FileSystem对象,然后通过调用FileSystem的open方法打开一个文件流:

1 public FSDataInputStream open(Path f) throws IOException {
2 return open(f, getConf().getInt("io.file.buffer.size", 4096));
3 }

由于FileSystem是抽象类,将具体的打开操作留给具体子类实现,例如FTPFileSystem、HarFileSystem、WebHdfsFileSystem等,不同的文件系统具有不同打开文件的行为,我们以DistributedFileSystem为例,open方法实现,代码如下所示:

1 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
2 statistics.incrementReadOps(1);
3 return new DFSClient.DFSDataInputStream(
4 dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
5 }

statistics对象用来收集文件系统操作的统计数据,这里使读取文件操作的计数器加1。然后创建了一个DFSClient.DFSDataInputStream对象,该对象的参数是通过DFSClient dfs客户端对象打开一个这个文件从而返回一个DFSInputStream对象,下面,我们看DFSClient的open方法实现,代码如下所示:

1 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
2 FileSystem.Statistics stats) throws IOException {
3 checkOpen();
4 // Get block info from namenode
5 return new DFSInputStream(src, buffersize, verifyChecksum);
6 }

checkOpen方法就是检查一个标志位clientRunning,表示当前的dfs客户端对象是否已经创建并初始化,在dfs客户端创建的时候该标志就为true,表示客户端正在运行状态。我们知道,当客户端DFSClient连接到Namenode的时候,实际上是创建了一个到Namenode的RPC连接,Namenode作为Server角色,DFSClient作为Client角色,它们之间建立起Socket连接。只有显式调用DFSClient的close方法时,才会修改clientRunning的值为false,实际上真正地关闭了已经建立的RPC连接。
我们看一下创建DFSInputStream的构造方法实现:

1 DFSInputStream(String src, int buffersize, boolean verifyChecksum) throws IOException {
2 this.verifyChecksum = verifyChecksum;
3 this.buffersize = buffersize;
4 this.src = src;
5 prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
6 openInfo();
7 }

先设置了几个与读取文件相关的参数值,这里有一个预先读取文件的Block字节数的参数prefetchSize,它的值设置如下:

1 public static final long DEFAULT_BLOCK_SIZE = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
2 public static final long DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
3
4 defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
5 private long prefetchSize = 10 * defaultBlockSize;

这个prefetchSize的值默认为10*64*1024*1024=671088640,也就是说,默认预读取一个文件的10个块,即671088640B=640M,如果想要修改这个值,设置dfs.block.size即可覆盖默认值。
然后调用了openInfo方法,从Namenode获取到该打开文件的信息,在openInfo方法中,具体实现如下所示:

01 synchronized void openInfo() throws IOException {
02 for (int retries = 3; retries > 0; retries--) {
03 if (fetchLocatedBlocks()) { // fetch block success. 如果成功获取到待读取文件对应的Block列表,则直接返回
04 return;
05 } else {
06 // Last block location unavailable. When a cluster restarts,
07 // DNs may not report immediately. At this time partial block
08 // locations will not be available with NN for getting the length.
09 // Lets retry a few times to get the length.
10 DFSClient.LOG.warn("Last block locations unavailable. "
11 + "Datanodes might not have reported blocks completely."
12 + " Will retry for " + retries + " times");
13 waitFor(4000);
14 }
15 }
16 throw new IOException("Could not obtain the last block locations.");
17 }

上述代码中,有一个for循环用来获取Block列表。如果成功获取到待读取文件的Block列表,则直接返回,否则,最多执行3次等待重试操作(最多花费时间大于12秒)。未能成功读取文件的Block列表信息,是因为Namenode无法获取到文件对应的块列表的信息,当整个集群启动的时候,Datanode会主动向NNamenode上报对应的Block信息,只有Block Report完成之后,Namenode就能够知道组成文件的Block及其所在Datanode列表的信息。openInfo方法方法中调用了fetchLocatedBlocks方法,用来与Namenode进行RPC通信调用,实际获取对应的Block列表,实现代码如下所示:

01 private boolean fetchLocatedBlocks() throws IOException,
02 FileNotFoundException {
03 LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
04 if (newInfo == null) {
05 throw new FileNotFoundException("File does not exist: " + src);
06 }
07
08 if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) {
09 Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
10 Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
11 while (oldIter.hasNext() && newIter.hasNext()) {
12 if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
13 throw new IOException("Blocklist for " + src + " has changed!");
14 }
15 }
16 }
17 boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
18 this.locatedBlocks = newInfo;
19 this.currentNode = null;
20 return isBlkInfoUpdated;
21 }

调用callGetBlockLocations方法,实际上是根据创建RPC连接以后得到的Namenode的代理对象,调用Namenode来获取到指定文件的Block的位置信息(位于哪些Datanode节点上):namenode.getBlockLocations(src, start, length)。调用callGetBlockLocations方法返回一个LocatedBlocks对象,该对象包含了文件长度信息、List blocks列表对象,其中LocatedBlock包含了一个Block的基本信息:

1 private Block b;
2 private long offset; // offset of the first byte of the block in the file
3 private DatanodeInfo[] locs;
4 private boolean corrupt;

有了这些文件的信息(文件长度、文件包含的Block的位置等信息),DFSClient就能够执行后续读取文件数据的操作了,详细过程我们在后面分析说明。

通过Namenode获取文件信息

上面,我们提到获取一个文件的基本信息,是通过Namenode来得到的,这里详细分析Namenode是如何获取到这些文件信息的,实现方法getBlockLocations的代码,如下所示:

1 public LocatedBlocks getBlockLocations(String src, long offset, long length) throwsIOException {
2 myMetrics.incrNumGetBlockLocations();
3 return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
4 }

可以看到,Namenode又委托管理HDFS name元数据的FSNamesystem的getBlockLocations方法实现:

01 LocatedBlocks getBlockLocations(String clientMachine, String src, long offset, longlength) throws IOException {
02 LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true, true);
03 if (blocks != null) {
04 //sort the blocks
05 // In some deployment cases, cluster is with separation of task tracker
06 // and datanode which means client machines will not always be recognized
07 // as known data nodes, so here we should try to get node (but not
08 // datanode only) for locality based sort.
09 Node client = host2DataNodeMap.getDatanodeByHost(clientMachine);
10 if (client == null) {
11 List<String> hosts = new ArrayList<String> (1);
12 hosts.add(clientMachine);
13 String rName = dnsToSwitchMapping.resolve(hosts).get(0);
14 if (rName != null)
15 client = new NodeBase(clientMachine, rName);
16 }
17
18 DFSUtil.StaleComparator comparator = null;
19 if (avoidStaleDataNodesForRead) {
20 comparator = new DFSUtil.StaleComparator(staleInterval);
21 }
22 // Note: the last block is also included and sorted
23 for (LocatedBlock b : blocks.getLocatedBlocks()) {
24 clusterMap.pseudoSortByDistance(client, b.getLocations());
25 if (avoidStaleDataNodesForRead) {
26 Arrays.sort(b.getLocations(), comparator);
27 }
28 }
29 }
30 return blocks;
31 }

跟踪代码,最终会在下面的方法中实现了,如何获取到待读取文件的Block的元数据列表,以及如何取出该文件的各个Block的数据,方法实现代码,这里我做了详细的注释,可以参考,如下所示:

01 private synchronized LocatedBlocks getBlockLocationsInternal(String src,
02 long offset,
03 long length,
04 int nrBlocksToReturn,
05 boolean doAccessTime,
06 boolean needBlockToken)
07 throws IOException {
08 INodeFile inode = dir.getFileINode(src); // 获取到与待读取文件相关的inode数据
09 if (inode == null) {
10 return null;
11 }
12 if (doAccessTime && isAccessTimeSupported()) {
13 dir.setTimes(src, inode, -1, now(), false);
14 }
15 Block[] blocks = inode.getBlocks(); // 获取到文件src所包含的Block的元数据列表信息
16 if (blocks == null) {
17 return null;
18 }
19 if (blocks.length == 0) { // 获取到文件src的Block数,这里=0,该文件的Block数据还没创建,可能正在创建
20 return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
21 }
22 List<LocatedBlock> results;
23 results = new ArrayList<LocatedBlock>(blocks.length);
24
25 int curBlk = 0; // 当前Block在Block[] blocks数组中的索引位置
26 long curPos = 0, blkSize = 0; // curPos表示某个block在文件中的字节偏移量,blkSize为Block的大小(字节数)
27 int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; // 获取到文件src的Block数,实际上一定>0,但是第一个block大小可能为0,这种情况认为nrBlocks=0
28 for (curBlk = 0; curBlk < nrBlocks; curBlk++) { // 根据前面代码,我们知道offset=0,所以这个循环第一次进来肯定就break出去了(正常的话,blkSize>0,所以我觉得这段代码写的稍微有点晦涩)
29 blkSize = blocks[curBlk].getNumBytes();
30 assert blkSize > 0 : "Block of size 0";
31 if (curPos + blkSize > offset) {
32 break;
33 }
34 curPos += blkSize;
35 }
36
37 if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file, 到这里curBlk=0,如果从文件src的第一个Block的字节数累加计算,知道所有的Block的字节数都累加上了,总字节数仍然<=请求的offset,说明即使到了文件尾部,仍然没有达到offset的值。从前面fetchLocatedBlocks()方法中调用我们知道,offset=0,所以执行该分支表示文件src没有可用的Block数据块可读
38 return null;
39
40 long endOff = offset + length; //
41
42 do {
43 // 获取Block所在位置(Datanode节点)
44 int numNodes = blocksMap.numNodes(blocks[curBlk]); // 计算文件src中第curBlk个Block存储在哪些Datanode节点上
45 int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas(); // 计算存储文件src中第curBlk个Block但无法读取该Block的Datanode节点数
46 int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); // 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
47 if (numCorruptNodes != numCorruptReplicas) {
48 LOG.warn("Inconsistent number of corrupt replicas for "
49 + blocks[curBlk] + "blockMap has " + numCorruptNodes
50 + " but corrupt replicas map has " + numCorruptReplicas);
51 }
52 DatanodeDescriptor[] machineSet = null; // 下面的if...else用来获取一个Block所在的Datanode节点
53 boolean blockCorrupt = false;
54 if (inode.isUnderConstruction() && curBlk == blocks.length - 1
55 && blocksMap.numNodes(blocks[curBlk]) == 0) { // 如果文件正在创建,当前blocks[curBlk]还没有创建成功(即没有可用的Datanode可以提供该Block的服务),仍然返回待创建Block所在的Datanode节点列表。数据块是在Datanode上存储的,只要Datanode完成数据块的存储后,通过heartbeat将数据块的信息上报给Namenode后,这些信息才会存储到blocksMap中
56 // get unfinished block locations
57 INodeFileUnderConstruction cons = (INodeFileUnderConstruction) inode;
58 machineSet = cons.getTargets();
59 blockCorrupt = false;
60 } else { // 文件已经创建完成
61 blockCorrupt = (numCorruptNodes == numNodes); // 是否当前的Block在所有Datanode节点上的副本都坏掉,无法提供服务
62 int numMachineSet = blockCorrupt ? numNodes : (numNodes - numCorruptNodes); // 如果是,则返回所有Datanode节点,否则,只返回可用的Block副本所在的Datanode节点
63 machineSet = new DatanodeDescriptor[numMachineSet];
64 if (numMachineSet > 0) { // 获取到当前Block所有副本所在的Datanode节点列表
65 numNodes = 0;
66 for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
67 DatanodeDescriptor dn = it.next();
68 boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
69 if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
70 machineSet[numNodes++] = dn;
71 }
72 }
73 }
74 LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos, blockCorrupt); // 创建一个包含Block的元数据对象、所在Datanode节点列表、起始索引位置(字节数)、健康状况的LocatedBlock对象
75 if (isAccessTokenEnabled && needBlockToken) { // 如果启用Block级的令牌(Token)访问,则为当前用户生成读模式的令牌信息,一同封装到返回的LocatedBlock对象中
76 b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
77 }
78
79 results.add(b); // 收集待返回给读取文件的客户端需要的LocatedBlock列表
80 curPos += blocks[curBlk].getNumBytes();
81 curBlk++;
82 } while (curPos < endOff && curBlk < blocks.length && results.size() < nrBlocksToReturn);
83
84 return inode.createLocatedBlocks(results); // 将收集的LocatedBlock列表数据封装到一个LocatedBlocks对象中返回
85 }

我们可以看一下,最后的调用inode.createLocatedBlocks(results)生成LocatedBlocks对象的实现,代码如下所示:

1 LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
2 return new LocatedBlocks(computeContentSummary().getLength(), blocks, isUnderConstruction()); // 通过ContentSummary对象获取到文件的长度
3 }

客户端通过RPC调用,获取到了文件对应的Block以及所在Datanode列表的信息,然后就可以根据LocatedBlocks来进一步获取到对应的Block对应的物理数据块。

对Block列表进行排序

我们再回到FSNamesystem类,调用getBlockLocationsInternal方法的getBlockLocations方法中,在返回文件block列表LocatedBlocks之后,会对每一个Block所在的Datanode进行的一个排序,排序的基本规则有如下2点:

  • Client到Block所在的Datanode的距离最近,这个是通过网络拓扑关系来进行计算,例如Client的网络路径为/dc1/r1/c1,那么路径为/dc1/r1/dn1的Datanode就比路径为/dc1/r2/dn2的距离小,/dc1/r1/dn1对应的Block就会排在前面
  • 从上面一点可以推出,如果Client就是某个Datanode,恰好某个Block的Datanode列表中包括该Datanode,则该Datanode对应的Block排在前面
  • Block所在的Datanode列表中,如果其中某个Datanode在指定的时间内没有向Namenode发送heartbeat(默认由常量DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT定义,默认值为30s),则该Datanode的状态即为STALE,具有该状态的Datanode对应的Block排在后面

基于上述规则排序后,Block列表返回到Client。

Client与Datanode交互更新文件Block列表

我们要回到前面分析的DFSClient.DFSInputStream.fetchLocatedBlocks()方法中,查看在调用该方法之后,是如何执行实际处理逻辑的:

01 private boolean fetchLocatedBlocks() throws IOException,
02 FileNotFoundException {
03 LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); // RPC调用向Namenode获取待读取文件对应的Block及其位置信息LocatedBlocks对象
04 if (newInfo == null) {
05 throw new FileNotFoundException("File does not exist: " + src);
06 }
07
08 if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && !newInfo.isUnderConstruction()) { // 这里面locatedBlocks!=null是和后面调用updateBlockInfo方法返回的状态有关的
09 Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
10 Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
11 while (oldIter.hasNext() && newIter.hasNext()) { // 检查2次获取到的LocatedBlock列表:第2次得到newInfo包含的Block列表,在第2次得到的locatedBlocks中是否发生变化,如果发生了变化,则不允许读取,抛出异常
12 if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
13 throw new IOException("Blocklist for " + src + " has changed!");
14 }
15 }
16 }
17 boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
18 this.locatedBlocks = newInfo;
19 this.currentNode = null;
20 return isBlkInfoUpdated;
21 }

如果第一次读取该文件时,已经获取到了对应的block列表,缓存在客户端;如果客户端第二次又读取了该文件,仍然获取到一个block列表对象。在两次读取之间,可能存在原文件完全被重写的情况,所以新得到的block列表与原列表完全不同了,存在这种情况,客户端直接抛出IO异常,如果原文件对应的block列表没有变化,则更新客户端缓存的对应block列表信息。
当集群重启的时候(如果允许安全模式下读文件),或者当一个文件正在创建的时候,Datanode向Namenode进行Block Report,这个过程中可能Namenode还没有完全重建好Block到Datanode的映射关系信息,所以即使在这种情况下,仍然会返回对应的正在创建的Block所在的Datanode列表信息,可以从前面getBlockLocationsInternal方法中看到,INode的对应UnderConstruction状态为true。这时,一个Block对应的所有副本中的某些可能还在创建过程中。
上面方法中,调用updateBlockInfo来更新文件的Block元数据列表信息,对于文件的某些Block可能没有创建完成,所以Namenode所保存的关于文件的Block的的元数据信息可能没有及时更新(Datanode可能还没有完成Block的报告),代码实现如下所示:

01 private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
02 if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction() || !(newInfo.locatedBlockCount() > 0)) { // 如果获取到的newInfo可以读取文件对应的Block信息,则返回true
03 return true;
04 }
05
06 LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1); // 从Namenode获取文件的最后一个Block的元数据对象LocatedBlock
07 boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo.getFileLength());
08 if (!lastBlockInFile) { // 如果“文件长度 != 最后一个块起始偏移量 + 最后一个块长度”,说明文件对应Block的元数据信息还没有更新,但是仍然返回给读取文件的该客户端
09 return true;
10 }
11 // 这时,已经确定last是该文件的最后一个bolck,检查最后个block的存储位置信息
12 if (last.getLocations().length == 0) {
13 return false;
14 }
15
16 ClientDatanodeProtocol primary = null;
17 Block newBlock = null;
18 for (int i = 0; i < last.getLocations().length && newBlock == null; i++) { // 根据从Namenode获取到的LocatedBlock last中对应的Datanode列表信息,Client与Datanode建立RPC连接,获取最后一个Block的元数据
19 DatanodeInfo datanode = last.getLocations()[i];
20 try {
21 primary = createClientDatanodeProtocolProxy(datanode, conf, last .getBlock(), last.getBlockToken(), socketTimeout, connectToDnViaHostname);
22 newBlock = primary.getBlockInfo(last.getBlock());
23 } catch (IOException e) {
24 if (e.getMessage().startsWith(
25 "java.io.IOException: java.lang.NoSuchMethodException: "
26 + "org.apache.hadoop.hdfs.protocol"
27 + ".ClientDatanodeProtocol.getBlockInfo")) {
28 // We're talking to a server that doesn't implement HDFS-200.
29 serverSupportsHdfs200 = false;
30 } else {
31 LOG.info("Failed to get block info from "
32 + datanode.getHostName() + " probably does not have "
33 + last.getBlock(), e);
34 }
35 } finally {
36 if (primary != null) {
37 RPC.stopProxy(primary);
38 }
39 }
40 }
41
42 if (newBlock == null) { // Datanode上不存在最后一个Block对应的元数据信息,直接返回
43 if (!serverSupportsHdfs200) {
44 return true;
45 }
46 throw new IOException("Failed to get block info from any of the DN in pipeline: "+ Arrays.toString(last.getLocations()));
47 }
48
49 long newBlockSize = newBlock.getNumBytes();
50 long delta = newBlockSize - last.getBlockSize();
51 // 对于文件的最后一个Block,如果从Namenode获取到的元数据,与从Datanode实际获取到的元数据不同,则以Datanode获取的为准,因为可能Datanode还没有及时将Block的变化信息向Namenode汇报
52 last.getBlock().setNumBytes(newBlockSize);
53 long newlength = newInfo.getFileLength() + delta;
54 newInfo.setFileLength(newlength); // 修改文件Block和位置元数据列表信息
55 LOG.debug("DFSClient setting last block " + last + " to length " + newBlockSize + " filesize is now " + newInfo.getFileLength());
56 return true;
57 }

我们看一下,在updateBlockInfo方法中,返回false的情况:Client向Namenode发起的RPC请求,已经获取到了组成该文件的数据块的元数据信息列表,但是,文件的最后一个数据块的存储位置信息无法获取到,说明Datanode还没有及时通过block report将数据块的存储位置信息报告给Namenode。通过在openInfo()方法中可以看到,获取文件的block列表信息有3次重试机会,也就是调用updateBlockInfo方法返回false,可以有12秒的时间,等待Datanode向Namenode汇报文件的最后一个块的位置信息,以及Namenode更新内存中保存的文件对应的数据块列表元数据信息。
我们再看一下,在updateBlockInfo方法中,返回true的情况:

  • 文件已经创建完成,文件对应的block列表元数据信息可用
  • 文件正在创建中,但是当前能够读取到的已经完成的最后一个块(非组成文件的最后一个block)的元数据信息可用
  • 文件正在创建中,文件的最后一个block的元数据部分可读:从Namenode无法获取到该block对应的位置信息,这时Client会与Datanode直接进行RPC通信,获取到该文件最后一个block的位置信息

上面Client会与Datanode直接进行RPC通信,获取文件最后一个block的元数据,这时可能由于网络问题等等,无法得到文件最后一个block的元数据,所以也会返回true,也就是说,Client仍然可以读取该文件,只是无法读取到最后一个block的数据。
这样,在Client从Namenode/Datanode获取到的文件的Block列表元数据已经是可用的信息,可以根据这些信息读取到各个Block的物理数据块内容了,准确地说,应该是文件处于打开状态了,已经准备好后续进行的读操作了。

目录
相关文章
|
24天前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
86 34
|
3月前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
71 2
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
|
3月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
50 3
|
3月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
54 2
|
3月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
50 2
|
3月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
47 1
|
3月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
59 1
|
5月前
|
存储 分布式计算 Hadoop
|
6月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
SQL JSON 数据处理
实时计算 Flink版产品使用问题之把hdfs集群里的core-site.xml hdfs.xml两个文件放到flink/conf/目录下,启动集群说找不到hdfs,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章