一、综述
HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。
首先上一段代码,客户端是如何写文件的:
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path file = new Path("demo.txt"); FSDataOutputStream outStream = fs.create(file); out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8")); outStream.close();只有简单的6行代码,客户端封装的如此简洁,各组件间的RPC调用、异常处理、容错等均对客户端透明。
总体来说,最简单的HDFS写文件大体流程如下:
1、客户端获取文件系统实例FileSyStem,并通过其create()方法获取文件系统输出流outputStream;
1.1、首先会联系名字节点NameNode,通过ClientProtocol.create()RPC调用,在名字节点上创建文件元数据,并获取文件状态FileStatus;
1.2、通过文件状态FileStatus构造文件系统输出流outputStream;
2、通过文件系统输出流outputStream写入数据;
2.1、首次写入会首先向名字节点申请数据块,名字节点能够掌握集群DataNode整体状况,分配数据块后,连同DataNode列表信息返回给客户端;
2.2、客户端采用流式管道的方式写入数据节点列表中的第一个DataNode,并由列表中的前一个DataNode将数据转发给后面一个DataNode;
2.3、确认数据包由DataNode经过管道依次返回给上游DataNode和客户端;
2.4、写满一个数据块后,向名字节点提交一个数据;
2.5、再次重复2.1-2.4过程;
3、向名字节点提交文件(complete file),即告知名字节点文件已写完,然后关闭文件系统输出流outputStream等释放资源。
可以看出,在不考虑异常等的情况下,上述过程还是比较复杂的。本文,我将着重阐述下HDFS写数据时,客户端是如何实现的,关于NameNode、DataNode等的配合等,后续文章将陆续推出,敬请关注!
二、实现分析
我们将带着以下问题来分析客户端写入数据过程:
1、如何获取数据输出流?
2、如何通过数据输出流写入数据?
3、数据输出流关闭时都做了什么?
4、如果发生异常怎么办?即如何容错?
(一)如何获取数据输出流?
HDFS客户端获取数据流是一个复杂的过程,流程图如下:
以DistributedFileSystem为例,create()是其入口方法,DistributedFileSystem内部封装了一个DFS的客户端,如下:
DFSClient dfs;在DistributedFileSystem的初始化方法initialize()中,会构造这个文件系统客户端,如下:
this.dfs = new DFSClient(uri, conf, statistics);
而create()方法就是通过这个文件系统客户端dfs获取数据输出流的,如下:
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { /* * 创建文件系统数据输出流 */ @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { // 调用create()方法创建文件,并获取文件系统输出流 final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }FileSystemLinkResolver是一个文件系统链接解析器(抽象类),我们待会再分析它,这里只要知道,该抽象类实例化后会通过resolve()方法--doCall()方法得到数据输出流即可。接着往下DFSClient的create()方法,省略部分代码,如下:
// 为create构建一个数据输出流 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes)); // 开启文件租约 beginFileLease(result.getFileId(), result); return result;实际上,它又通过DFSOutputStream的newStreamForCreate()方法来获取数据输出流,并开启文件租约。租约的内容我们后续再讲,继续看下如何获取文件输出流的,如下:
/** * 为创建文件构造一个新的输出流 */ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src); try { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { // 首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); // 构造一个数据输出流 final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); // 启动数据输出流 out.start(); return out; } finally { scope.close(); } }大体可以分为三步:
1、首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态HdfsFileStatus;
2、构造一个数据输出流;
3、启动数据输出流。
上述连接NameNode节点创建文件的过程中,如果发生瞬时错误,会充分利用重试机制,增加系统容错性。DFSClient中nameNode的Create()方法,实际上是调用的是客户端与名字节点间的RPC--ClientProtocol的create()方法,该方法的作用即是在NameNode上创建一个空文件,并返回文件状态。文件状态主要包括以下信息:
// 文件路径 private final byte[] path; // local name of the inode that's encoded in java UTF8 // 符号连接 private final byte[] symlink; // symlink target encoded in java UTF8 or null private final long length;// 文件长度 private final boolean isdir;// 是否为目录 private final short block_replication;// 数据块副本数 private final long blocksize;// 数据块大小 private final long modification_time;// 修改时间 private final long access_time;// 访问时间 private final FsPermission permission;// 权限 private final String owner;// 文件所有者 private final String group;// 文件所属组 private final long fileId;// 文件ID继续看如何构造一个数据输出流,实际上它是通过构造DFSOutputStream实例获取的,而DFSOutputStream的构造方法如下:
/** Construct a new output stream for creating a file. */ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); // 计算数据包块大小 computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); // 构造数据流对象 streamer = new DataStreamer(stat, null); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } }首先计算数据包块大小,然后构造数据流对象,后续就依靠这个数据流对象来通过管道发送流式数据。接下来便是启动数据输出流,如下:
private synchronized void start() { streamer.start(); }很简单,实际上也就是启动数据流对象,通过这个数据流对象实现数据的发送。
中间为什么会有计算数据包块大小这一步呢?原来,数据的发送是通过一个个数据包发送出去的,而不是通过数据块发送的。设想下,如果按照一个数据块(默认128M)大小发送数据,合理吗?至于数据包大小是如何确定的,我们后续再讲。
(二)如何通过数据输出流写入数据?
下面,该看看如何通过数据输出流写入数据了。要解决这个问题,首先分析下DFSOutputStream和DataStreamer是什么。
1、DFSOutputStream
DFSOutputStream是分布式文件系统输出流,它内部封装了两个队列:发送数据包队列和确认数据包队列,如下:
// 发送数据包队列 private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>(); // 确认数据包队列 private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();客户端写入的数据,会addLast入发送数据包队列dataQueue,然后交给DataStreamer处理。
2、DataStreamer
DataStreamer是一个后台工作线程,它负责在数据流管道中往DataNode发送数据包。它从NameNode申请获取一个新的数据块ID和数据块位置,然后开始往DataNode的管道写入流式数据包。每个数据包都有一个序列号sequence number。当一个数据块所有的数据包被发送出去,并且每个数据包的确认信息acks被接收到的话,DataStreamer关闭当前数据块,然后再向NameNode申请下一个数据块。
所以,才会有上述发送数据包和确认数据包这两个队列。
DataStreamer内部有很多变量,大体如下:
// streamer关闭标志位 private volatile boolean streamerClosed = false; // 扩展块,它的长度是已经确认ack的bytes大小 private ExtendedBlock block; // its length is number of bytes acked private Token<BlockTokenIdentifier> accessToken; // 数据输出流 private DataOutputStream blockStream; // 数据输入流:即回复流 private DataInputStream blockReplyStream; // 响应处理器 private ResponseProcessor response = null; // 当前块的数据块列表 private volatile DatanodeInfo[] nodes = null; // list of targets for current block // 存储类型 private volatile StorageType[] storageTypes = null; // 存储ID private volatile String[] storageIDs = null; // 需要排除的节点 private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes = CacheBuilder.newBuilder() .expireAfterWrite( dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { @Override public void onRemoval( RemovalNotification<DatanodeInfo, DatanodeInfo> notification) { DFSClient.LOG.info("Removing node " + notification.getKey() + " from the excluded nodes list"); } }) .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() { @Override public DatanodeInfo load(DatanodeInfo key) throws Exception { return key; } }); // 优先节点 private String[] favoredNodes; // 是否存在错误 volatile boolean hasError = false; volatile int errorIndex = -1; // Restarting node index // 从哪个节点重试的索引 AtomicInteger restartingNodeIndex = new AtomicInteger(-1); private long restartDeadline = 0; // Deadline of DN restart // 当前数据块构造阶段 private BlockConstructionStage stage; // block construction stage // 已发送数据大小 private long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; /** Nodes have been used in the pipeline before and have failed. */ private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>(); /** The last ack sequence number before pipeline failure. */ // 管道pipeline失败前的最后一个确认包序列号 private long lastAckedSeqnoBeforeFailure = -1; // 管道恢复次数 private int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ // 当前数据块是否已被Hflushed private boolean isHflushed = false; /** Append on an existing block? */ // 是否需要在现有块上append private final boolean isAppend;有很多比较简单,不再赘述。这里只讲解几个比较重要的:
1、BlockConstructionStage stage
当前数据块构造阶段。针对create()这种写入 来说,开始时默认是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化时需要向NameNode申请数据块及所在数据节点的状态,这个很容易理解。有了数据块和其所在数据节点所在列表,才能形成管道列表不是?在数据流传输过程中,即一个数据块写入的过程中,虽然有多次数据包写入,但状态始终为DATA_STREAMING,即正在流式写入的阶段。而当发生异常时,则是PIPELINE_SETUP_STREAMING_RECOVERY状态,即需要从流式数据中进行恢复,如果一个数据块写满,则会进入下一个周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最后数据全部写完后,状态会变成PIPELINE_CLOSE,并且如果发生异常的话,会有一个特殊状态对应,即PIPELINE_CLOSE_RECOVERY。而append开始时则是对应的状态PIPELINE_SETUP_APPEND及异常状态PIPELINE_SETUP_APPEND_RECOVERY,其它则一致。
2、volatile boolean hasError = false
这个状态位用来标记数据写入过程中,是否存在错误,方便进行容错。
3、ResponseProcessor response
响应处理器。这个也是后台工作线程,它会处理来自DataNode回复流中的确认包,确认数据是否发送成功,如果成功,将确认包从确认数据包队列中移除,否则进行容错处理。
create()模式下的DataStreamer构造比较简单,如下:
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { isAppend = false; isLazyPersistFile = isLazyPersist(stat); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; }isAppend设置为false,即不是append写入,BlockConstructionStage默认为PIPELINE_SETUP_CREATE,即需要向NameNode写入数据块。
我们首先看下DataStreamer是如何发送数据的。上面讲到过,DFSOutputStream中包括两个队列:发送数据包队列和确认数据包队列。这类似于两个生产者消--费者模型。针对发送数据包队列,外部写入者为生产者,DataStreamer为消费者。外部持续写入数据至发送数据包队列,DataStreamer则从中消费数据,判断是否需要申请数据块,然后写入数据节点流式管道。而确认数据包队列,DataStreamer为生产者,ResponseProcessor为消费者。首先,确认数据包队列数据的产生,是DataStreamer发送数据给DataNode后,从发送数据包队列挪过来的,而当ResponseProcessor线程确认接收到数据节点的ack确认包后,再从数据确认队列中删除。
关于ResponseProcessor线程,稍后再讲。
数据写入过程之DataStreamer
首先看DataStreamer的run()方法,它会在数据流没有关闭,且dfs客户端正在运行的情况下,一直循环,循环内处理的大体流程如下:
1、如果遇到一个错误(hasErro),且响应器尚未关闭,关闭响应器,使之join等待;
2、如果有DataNode相关IO错误,先预先处理,初始化一些管道和流的信息,并决定外部是否等待,等待意即可以进行容错处理,不等待则数目错误比较严重,无法进行容错处理:这里还判断了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某个具体数据节点引起的错误,如果是的话,这种错误理论上是可以处理的;
3、没有数据时,等待一个数据包发送:等待的条件是:当前流没有关闭(!streamerClosed)、没有错误(hasError)、dfs客户端正在 运行(dfsClient.clientRunning )、dataQueue队列大小为0,且当前阶段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次发包距离本次时间未超过阈值的情况下为DATA_STREAMING
意思是各种标记为正常,数据流处于正常发送的过程或者可控的非正常发送过程中,可控表现在状态位doSleep,即上传错误检查中认为理论上可以进行修复,但是需要sleep已完成recovery的初始化,或者距离上次发送未超过时间的阈值等。
4、如果数据流关闭、存在错误、客户端正常运行标志位异常时,执行continue:这个应该是对容错等的处理,让程序及时响应错误;
5、获取将要发送的数据包:
如果数据发送队列为空,构造一个心跳包;否则,取出队列中第一个元素,即待发送数据包。
6、如果当前阶段是PIPELINE_SETUP_CREATE,申请数据块,设置pipeline,初始化数据流:append的setup阶段则是通过setupPipelineForAppendOrRecovery()方法完成的,并同样会初始化数据流;
7、获取数据块中的上次数据位置lastByteOffsetInBlock,如果超过数据块大小,报错;
8、 如果是数据块的最后一个包:等待所有的数据包被确认,即等待datanodes的确认包acks,如果数据流关闭,或者数据节点IO存在错误,或者客户端不再正常运行,continue,设置阶段为pipeline关闭
9、发送数据包:将数据包从dataQueue队列挪至ackQueue队列,通知dataQueue的所有等待者,将数据写入远端的DataNode节点,并flush,如果发生异常,尝试标记主要的数据节点错误,方便容错处理;
10、更新已发送数据大小:可以看出,数据包中存储了其在数据块中的位置LastByteOffsetBlock,也就标记了已经发送数据的总大小;
11、数据块写满了吗?如果是最后一个数据块,等待确认包,调用endBlock()方法结束一个数据块 ;
如果上述流程发生错误,hasError标志位设置为true,并且如果不是一个DataNode引起的原因,流关闭标志设置为true。
最后,没有数据需要发送,或者发生致命错误的情况下,调用closeInternal()方法关闭内部资源。
未完待续,请关注《Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(2)》。
三、代码分析