本文是基于hadoop2.6.5的源码分析。
客户端源码分析
启动的客户端代码
public static void main(String[] args) throws Exception { // 创建配置文件对象 Configuration conf = new Configuration(true); // 获取Job对象 Job job = Job.getInstance(conf); // 设置相关类 job.setJarByClass(WcTest.class); // 指定 Map阶段和Reduce阶段的处理类 job.setMapperClass(MyMapperTask.class); job.setReducerClass(MyReducerTask.class); // 指定Map阶段的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定job的原始文件的输入输出路径 通过参数传入 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务,并等待响应 job.waitForCompletion(true); }
1.Configuration 对象
Configuration 用来存储相关的配置文件。在该类中有一段static代码块
2.Job对象的获取
我们来看下Job对象的实例化过程。
// 获取Job对象 Job job = Job.getInstance(conf);
进入getInstance(conf)方法。
public static Job getInstance(Configuration conf) throws IOException { // create with a null Cluster JobConf jobConf = new JobConf(conf); return new Job(jobConf); }
Job类中同样有static代码块。
进入loadResources方法
3.waitForCompletion
该方法的执行过程比较复杂,我们慢慢来分析,首先来看下简化的时序图
3.1waitForCompletion
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { // 判断任务的状态,如果是DEFINE就提交 if (state == JobState.DEFINE) { submit(); } if (verbose) { // 监听并且输出任务信息 monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { // 间隔判断是否执行完成 Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
3.2submit
进入submit方法查看
public void submit() throws IOException, InterruptedException, ClassNotFoundException { // 再次确认任务状态 ensureState(JobState.DEFINE); // 默认使用new APIs setUseNewAPI(); // 初始化cluster对象 connect(); // 根据初始化得到的cluster对象生成JobSubmitter对象 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); // status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { // 进入 submitJobInternal 方法查看 return submitter.submitJobInternal(Job.this, cluster); } }); //将job的状态设置为RUNNING state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
3.3 submitJobInternal
/** * * 检查job的输入输出规范 * 计算job的InputSplit * 如果需要的话,设置需要的核算信息对于job的分布式缓存 * 复制job的jar和配置文件到分布式文件系统的系统目录 * 提交作业执行以及监控它的状态 */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //检查job的输出空间 checkSpecs(job); Configuration conf = job.getConfiguration(); // 将MapReduce框架加入分布式缓存中 addMRFrameworkToDistributedCache(conf); // 初始化job的工作根目录并返回path路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } // 为job分配一个名字 JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); // 获得job的提交路径,也就是在jobStagingArea目录下建一个以jobId为文件名的目录 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; // 进行一系列的配置 try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } // 这个方法实现文件上传 copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); // 方法内部会根据我们之前的设置,选择使用new-api还是old-api分别进行分片操作 int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // 提交规划文件 job.split wc.jar ... writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // 提交任务 printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
3.4writeSplits
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { //进入 maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
3.5writeNewSplits
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); // 根据我们设置的inputFormat.class通过反射获得inputFormat对象 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); // 获取分片信息 List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); // 将分片的信息写入到jobSubmitDir --job.split文件中 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
3.6 getSplits
public List<InputSplit> getSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch().start(); // 最小值 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 最大值 long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { // 获取block大小 long blockSize = file.getBlockSize(); // 获取splitSize大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; }
3.7computeSplitSize
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
3.8 submitJobInternal
回到 submitJobInternal方法中
// 提交规划文件 job.split wc.jar ... writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // 提交任务 printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) // 删除规划文件 jtFs.delete(submitJobDir, true); } }
至此整理流程代码看完~ 详细的可以多看下源码