在《HDFS源码分析DataXceiver之整体流程》一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver。并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理。而决定什么样的操作符op该调用何种方法的逻辑,则是在DataXceiver线程父类Receiver的processOp()方法中实现的,代码如下:
/** Process op by the corresponding method. */ protected final void processOp(Op op) throws IOException { // 通过调用相应的方法处理操作符 switch(op) { case READ_BLOCK:// 读数据块调用opReadBlock()方法 opReadBlock(); break; case WRITE_BLOCK:// 写数据块调用opWriteBlock()方法 opWriteBlock(in); break; case REPLACE_BLOCK:// 替换数据块调用opReplaceBlock()方法 opReplaceBlock(in); break; case COPY_BLOCK:// 复制数据块调用REPLACE()方法 opCopyBlock(in); break; case BLOCK_CHECKSUM:// 数据块检验调用opBlockChecksum()方法 opBlockChecksum(in); break; case TRANSFER_BLOCK:// 移动数据块调用opTransferBlock()方法 opTransferBlock(in); break; case REQUEST_SHORT_CIRCUIT_FDS: opRequestShortCircuitFds(in); break; case RELEASE_SHORT_CIRCUIT_FDS: opReleaseShortCircuitFds(in); break; case REQUEST_SHORT_CIRCUIT_SHM: opRequestShortCircuitShm(in); break; default: throw new IOException("Unknown op " + op + " in data stream"); } }接下来的几篇文章,我们将依次为大家介绍读数据块、写数据块、替换数据块、复制数据块、移动数据块等具体数据读写请求的处理。
那么今天,我们首先来看下第一种数据读写请求--读数据块READ_BLOCK,它是通过调用opReadBlock()方法完成的,我们先看下这个方法的代码:
/** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { // 解析输入流,得到读数据块消息协议OpReadBlockProto,即proto OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); // 创建TraceScope类型的traceScope TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { // 调用readBlock()方法,完成读数据块操作 // 从读数据块消息协议OpReadBlockProto中分别获得需要读取的数据块block、访问令牌blockToken、客户端名称clientName、数据块读取的起始偏移量blockOffset、 // 数据读取的长度length、是否发送块校验sendChecksum、缓存策略CachingStrategy类型的cachingStrategy readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), proto.getLen(), proto.getSendChecksums(), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); } finally { // 关闭traceScope if (traceScope != null) traceScope.close(); } }整个处理流程非常简单。首先,解析输入流,得到读数据块消息协议OpReadBlockProto,即proto,并创建TraceScope类型的traceScope;然后从读数据块消息协议proto中解析出读数据块的各种参数,比如需要读取的数据块block、访问令牌blockToken、客户端名称clientName、数据块读取的起始偏移量blockOffset、数据块读取的长度length、是否发送块校验sendChecksum、缓存策略CachingStrategy类型的cachingStrategy等,利用这些参数调用子类DataXceiver线程的readBlock()方法,进行读数据块的处理,最终关闭traceScope,整个数据块读取过程完毕。
我们再来看下其中涉及的部分细节。首先,在我们要概括性的讲解读数据块消息协议OpReadBlockProto前,我们先看下对于输入流是怎么处理的,答案就在类PBHelper中的vintPrefixed()方法中,其代码如下:
public static InputStream vintPrefixed(final InputStream input) throws IOException { // 从输入流input中读入第一个字节Byte final int firstByte = input.read(); if (firstByte == -1) { throw new EOFException("Premature EOF: no length prefix available"); } // CodedInputStream用来读取和解码协议消息字段。 // Varint是一种数值压缩存储方法 // readRawVarint32()方法从输入流中读取一个原始的Varint,并且,如果高于32位,丢弃之。 // firstByte是为了告诉CodedInputStream已经从输入流input中读取了1个字节 // 返回结果为int类型的消息大小 int size = CodedInputStream.readRawVarint32(firstByte, input); // 确保消息大小必须大于0 assert size >= 0; // 将输入流input包装成ExactSizeInputStream,从该输入流中只能读取size大小的数据 // ExactSizeInputStream是一种从其他输入流中读取固定大小数据的输入流。 return new ExactSizeInputStream(input, size); }首先呢,从输入流input中读入第一个字节Byte,然后调用CodedInputStream的readRawVarint32()方法,获取请求内容的大小。CodedInputStream用来读取和解码协议消息字段。Varint是一种数值压缩存储方法。readRawVarint32()方法从输入流中读取一个原始的Varint,并且,如果高于32位,丢弃之。firstByte是为了告诉CodedInputStream已经从输入流input中读取了1个字节,返回结果为int类型的消息大小,同时确保消息大小必须大于0。最后,将输入流input包装成ExactSizeInputStream,从该输入流中只能读取size大小的数据,ExactSizeInputStream是一种从其他输入流中读取固定大小数据的输入流。
接下来,我们再说下解析输入流,得到读数据块消息协议OpReadBlockProto。这个OpReadBlockProto是什么呢?它是谷歌开源的Protobuf在HDFS中定义的进行数据传输时的一种消息协议,其消息格式的定义在文件datatransfer.proto中,内容如下:
message OpReadBlockProto { required ClientOperationHeaderProto header = 1; required uint64 offset = 2; required uint64 len = 3; optional bool sendChecksums = 4 [default = true]; optional CachingStrategyProto cachingStrategy = 5; }其中,header、offset、len为必须的,因为它们使用了关键字required,而剩余两个sendChecksums、cachingStrategy则由于使用了关键字optional,所以为可选的。并且,header为ClientOperationHeaderProto类型,而ClientOperationHeaderProto也是一种消息格式,定义如下:
message ClientOperationHeaderProto { required BaseHeaderProto baseHeader = 1; required string clientName = 2; }其中,baseHeader还是Protobuf定义的一种消息格式,其名称为BaseHeaderProto,其定义如下:
message BaseHeaderProto { required ExtendedBlockProto block = 1; optional hadoop.common.TokenProto token = 2; optional DataTransferTraceInfoProto traceInfo = 3; }它包含了数据块block,即ExtendedBlockProto,所以,在获得读数据块消息协议OpReadBlockProto之后,调用readBlock()方法之前,我们可以使用如下语句:
PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()来获得readBlock()方法需要使用的参数ExtendedBlock。读数据块消息协议中的其他字段不再多一一介绍,读者可自行分析。
最后,我们来看下读取数据块的readBlock()方法,其代码如下:
@Override public void readBlock(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, final long length, final boolean sendChecksum, final CachingStrategy cachingStrategy) throws IOException { // 将请求中的客户端名称clientName赋值给previousOpClientName previousOpClientName = clientName; // 获取输出流baseStream,即socketOut OutputStream baseStream = getOutputStream(); // 将输出流baseStream依次包装成BufferedOutputStream、DataOutputStream, // 其缓冲区大小取参数io.file.buffer.size的一半, // 参数未配置的话默认为512,且最大也不能超过512 DataOutputStream out = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); // 访问权限检查 checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ); // send the block // 发送数据块 BlockSender blockSender = null; // 获取数据节点注册信息DatanodeRegistration DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block.getBlockPoolId()); final String clientTraceFmt = clientName.length() > 0 && ClientTraceLog.isInfoEnabled() ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress, "%d", "HDFS_READ", clientName, "%d", dnR.getDatanodeUuid(), block, "%d") : dnR + " Served block " + block + " to " + remoteAddress; // 更新当前线程名称:Sending block... updateCurrentThreadName("Sending block " + block); try { try { // 构造数据块发送器BlockSender对象blockSender // 构造时,需要对应数据块block、数据在块中的起始位置blockOffset、读取数据的长度length等信息 blockSender = new BlockSender(block, blockOffset, length, true, false, sendChecksum, datanode, clientTraceFmt, cachingStrategy); } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); sendResponse(ERROR, msg); throw e; } // send op status // 发送操纵状态 writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream())); // 调用数据块发送器blockSender的sendBlock()方法,发送数据块 long read = blockSender.sendBlock(out, baseStream, null); // send data if (blockSender.didSendEntireByteRange()) { // If we sent the entire range, then we should expect the client // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( PBHelper.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + peer.getRemoteAddressString() + " did not send a valid status code after reading. " + "Will close connection."); IOUtils.closeStream(out); } } catch (IOException ioe) { LOG.debug("Error reading client status response. Will close connection.", ioe); IOUtils.closeStream(out); } } else { IOUtils.closeStream(out); } // 数据节点datanode记录相关系统性能指标的增长,这里是读取的字节数、读取的块数 datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBlocksRead(); } catch ( SocketException ignored ) { if (LOG.isTraceEnabled()) { LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " + remoteAddress, ignored); } // Its ok for remote side to close the connection anytime. datanode.metrics.incrBlocksRead(); IOUtils.closeStream(out); } catch ( IOException ioe ) { /* What exactly should we do here? * Earlier version shutdown() datanode if there is disk error. */ LOG.warn(dnR + ":Got exception while serving " + block + " to " + remoteAddress, ioe); throw ioe; } finally { // 关闭数据块发送器 IOUtils.closeStream(blockSender); } //update metrics datanode.metrics.addReadBlockOp(elapsed()); datanode.metrics.incrReadsFromClient(peer.isLocal()); }readBlock()方法大体处理流程如下:
1、将请求中的客户端名称clientName赋值给previousOpClientName;
2、获取输出流baseStream,即socketOut;
3、将输出流baseStream依次包装成BufferedOutputStream、DataOutputStream,其缓冲区大小取参数io.file.buffer.size的一半,参数未配置的话默认为512,且最大也不能超过512;
4、调用checkAccess()方法进行访问权限检查;
5、发送数据块:
5.1、 获取数据节点注册信息DatanodeRegistration;
5.2、更新当前线程名称:Sending block...;
5.3、构造数据块发送器BlockSender对象blockSender,构造时,需要对应数据块block、数据在块中的起始位置blockOffset、读取数据的长度length等信息;
5.4、调用writeSuccessWithChecksumInfo()方法发送操作状态;
5.5、调用数据块发送器blockSender的sendBlock()方法,发送数据块;
5.6、数据节点datanode记录相关系统性能指标的增长,这里是读取的字节数、读取的块数;
5.7、关闭数据块发送器。
大体处理流程就是这个样子。而关于BlockSender及其构造、如何定位数据以及如何发送数据等,我们将会在专门的文章中进行分析,敬请期待!