一、简介
DFSClient Hedged Read是Hadoop-2.4.0引入的一个新特性,如果读取一个数据块的操作比较慢,DFSClient Hedged Read将会开启一个从另一个副本的hedged读操作。我们会选取首先完成的操作,并取消其它操作。这个Hedged读特性将有助于控制异常值,比如由于命中一个坏盘等原因而需要花费较长时间的异常阅读等。
二、开启
DFSClient Hedged Read特性默认是关闭的。如果要开启,则需配置如下:
1、dfs.client.hedged.read.threadpool.size
并发Hedged 读的线程池大小
2、dfs.client.hedged.read.threshold.millis
开启一个Hedged 读前的等待时间(毫秒)
三、实现分析
1、DFSClient实现
DFSClient中,定义了一个静态线程池:
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
DFSClient的构造函数中,有如下处理:
this.hedgedReadThresholdMillis = conf.getLong( DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS); int numThreads = conf.getInt( DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE); if (numThreads > 0) { this.initThreadsNumForHedgedReads(numThreads); }根据参数dfs.client.hedged.read.threadpool.size确定是否实例化线程池,而initThreadsNumForHedgedReads()方法如下:
/** * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if * it does not already exist. * @param num Number of threads for hedged reads thread pool. * If zero, skip hedged reads thread pool creation. */ private synchronized void initThreadsNumForHedgedReads(int num) { if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return; HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); t.setName("hedgedRead-" + threadIndex.getAndIncrement()); return t; } }, new ThreadPoolExecutor.CallerRunsPolicy() { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { LOG.info("Execution rejected, Executing in current thread"); HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(); // will run in the current thread super.rejectedExecution(runnable, e); } }); HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); if (LOG.isDebugEnabled()) { LOG.debug("Using hedged reads; pool threads=" + num); } }实例化了一个ThreadPoolExecutor,corePoolSize大小是1,maximumPoolSize大小是参数,workQueue为一个没有数据缓冲的阻塞队列,ThreadFactory是Hadoop自己实现的后台线程工厂,并自定义了RejectedExecutionHandler,主要是在有异常时实现HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(),即计数器减1。
最后,DFSClient提供了如下几个get和set方法,方便输入流调用:
long getHedgedReadTimeout() { return this.hedgedReadThresholdMillis; } @VisibleForTesting void setHedgedReadTimeout(long timeoutMillis) { this.hedgedReadThresholdMillis = timeoutMillis; } ThreadPoolExecutor getHedgedReadsThreadPool() { return HEDGED_READ_THREAD_POOL; } boolean isHedgedReadsEnabled() { return (HEDGED_READ_THREAD_POOL != null) && HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; } DFSHedgedReadMetrics getHedgedReadMetrics() { return HEDGED_READ_METRIC; }2、DFSInputStream实现
在输入流DFSInputStream的read方法中,会通过dfsClient.isHedgedReadsEnabled()判断是否开启了Hedged Read特性,在其开启的情况下,调用hedgedFetchBlockByteRange()方法进行数据读取操作,如下:
if (dfsClient.isHedgedReadsEnabled()) { hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); } else { fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); }hedgedFetchBlockByteRange()方法通过ExecutorCompletionService和Future List实现了Hedged Read特性,具体实现如下:
1、构造一个futures列表:
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();2、构造一个ExecutorCompletionService:
CompletionService<ByteBuffer> hedgedService = new ExecutorCompletionService<ByteBuffer>( dfsClient.getHedgedReadsThreadPool());3、计算数据块和长度;
ByteBuffer bb = null; int len = (int) (end - start + 1); block = getBlockAt(block.getStartOffset(), false);4、在一个while循环内,分两种情况:
1)第一次读取时:从NameNode选取DataNode,即chooseDataNode,构造Callable并提交至hedgedService,获取Future<ByteBuffer> firstRequest,然后
用非阻塞的poll获取结果future,判断future是否成功,成功即返回,否则在ignored中添加下次需要忽略的本节点,incHedgedReadOps计数并继续;
2)通过getBestNodeDNAddrPair或chooseDataNode选取DataNode,构造Callable并提交至hedgedService,通过getFirstToComplete获取第一个成功的结果后,调用cancelAll取消其它的,并计数,否则也是计数外加忽略本次DataNode。
getFirstToComplete中,是通过阻塞式的hedgedService.take()来实现的。
具体代码如下:
while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops hedgedReadOpsLoopNumForTesting++; DNAddrPair chosenNode = null; // there is no request already executing. if (futures.isEmpty()) { // chooseDataNode is a commitment. If no node, we go to // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, corruptedBlockMap); Future<ByteBuffer> firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); try { Future<ByteBuffer> future = hedgedService.poll( dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); if (future != null) { future.get(); return; } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + "ms to read from " + chosenNode.info + "; spawning hedged read"); } // Ignore this node on next go around. ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); continue; // no need to refresh block locations } catch (InterruptedException e) { // Ignore } catch (ExecutionException e) { // Ignore already logged in the call. } } else { // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. try { try { chosenNode = getBestNodeDNAddrPair(block, ignored); } catch (IOException ioe) { chosenNode = chooseDataNode(block, ignored); } bb = ByteBuffer.allocate(len); Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, corruptedBlockMap); Future<ByteBuffer> oneMoreRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(oneMoreRequest); } catch (IOException ioe) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Failed getting node for hedged read: " + ioe.getMessage()); } } // if not succeeded. Submit callables for each datanode in a loop, wait // for a fixed interval and get the result from the fastest one. try { ByteBuffer result = getFirstToComplete(hedgedService, futures); // cancel the rest. cancelAll(futures); if (result.array() != buf) { // compare the array pointers dfsClient.getHedgedReadMetrics().incHedgedReadWins(); System.arraycopy(result.array(), result.position(), buf, offset, len); } else { dfsClient.getHedgedReadMetrics().incHedgedReadOps(); } return; } catch (InterruptedException ie) { // Ignore and retry } // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { ignored.add(chosenNode.info); } } }而getFirstToComplete实现如下:
private ByteBuffer getFirstToComplete( CompletionService<ByteBuffer> hedgedService, ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { if (futures.isEmpty()) { throw new InterruptedException("let's retry"); } Future<ByteBuffer> future = null; try { future = hedgedService.take(); ByteBuffer bb = future.get(); futures.remove(future); return bb; } catch (ExecutionException e) { // already logged in the Callable futures.remove(future); } catch (CancellationException ce) { // already logged in the Callable futures.remove(future); } throw new InterruptedException("let's retry"); }over...