mapreduce中counter的使用

简介:

   MapReduce Counter为提供我们一个窗口:观察MapReduce job运行期的各种细节数据。MapReduce自带了许多默认Counter。

    Counter有"组group"的概念,用于表示逻辑上相同范围的所有数值。MapReduce job提供的默认Counter分为三个组

  1. Map-Reduce Frameword
    Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
  2. File Systems
    FileSystem bytes read,FileSystem bytes written
  3. Job Counters
    Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
        这些  counters你在Web UI中,或是job结束后在控制台生成的统计报告中都看得到。   见如下MR运行日志:


-bash-4.1$ hadoop jar mr.jar com.catt.cdh.mr.CountRecords
13/11/29 11:38:04 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
13/11/29 11:38:10 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/11/29 11:38:11 INFO input.FileInputFormat: Total input paths to process : 1
13/11/29 11:38:11 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
13/11/29 11:38:11 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 6298911ef75545c61859c08add6a74a83e0183ad]
13/11/29 11:38:12 INFO mapred.JobClient: Running job: job_201311251130_0208
13/11/29 11:38:13 INFO mapred.JobClient:  map 0% reduce 0%
13/11/29 11:38:40 INFO mapred.JobClient:  map 100% reduce 0%
13/11/29 11:38:49 INFO mapred.JobClient:  map 100% reduce 100%
13/11/29 11:38:57 INFO mapred.JobClient: Job complete: job_201311251130_0208
13/11/29 11:38:57 INFO mapred.JobClient: Counters: 32
13/11/29 11:38:57 INFO mapred.JobClient:   File System Counters
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes read=36
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of bytes written=322478
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     FILE: Number of write operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes read=139
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of bytes written=7
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of read operations=2
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of large read operations=0
13/11/29 11:38:57 INFO mapred.JobClient:     HDFS: Number of write operations=1
13/11/29 11:38:57 INFO mapred.JobClient:   Job Counters 
13/11/29 11:38:57 INFO mapred.JobClient:     Launched map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Launched reduce tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Data-local map tasks=1
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=31068
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=6671
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/29 11:38:57 INFO mapred.JobClient:   Map-Reduce Framework
13/11/29 11:38:57 INFO mapred.JobClient:     Map input records=13
13/11/29 11:38:57 INFO mapred.JobClient:     Map output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Map output bytes=14
13/11/29 11:38:57 INFO mapred.JobClient:     Input split bytes=103
13/11/29 11:38:57 INFO mapred.JobClient:     Combine input records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Combine output records=0
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input groups=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce shuffle bytes=32
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce input records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Reduce output records=1
13/11/29 11:38:57 INFO mapred.JobClient:     Spilled Records=2
13/11/29 11:38:57 INFO mapred.JobClient:     CPU time spent (ms)=4780
13/11/29 11:38:57 INFO mapred.JobClient:     Physical memory (bytes) snapshot=657629184
13/11/29 11:38:57 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3802001408
13/11/29 11:38:57 INFO mapred.JobClient:     Total committed heap usage (bytes)=1915486208
13/11/29 11:38:57 INFO mr.CountRecords: sum     13


使用Java Enum自定义Counter

一个Counter可以是任意的Enum类,见如下代码示例:

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * 使用Java Enum自定义Counter
 * 一个Counter可以是任意的Enum类型。
 * 比如有个文件每行记录了用户的每次上网时长,统计上网时间超过30分钟的次数,小于或等于30分钟的次数
 * 可以使用下面的代码。最后的计数结果会显示在终端上
 */
public class CounterTest extends Configured implements Tool {
	private final static Log log = LogFactory.getLog(CounterTest.class);

	public static void main(String[] args) throws Exception {
		String[] ars = new String[] { "hdfs://data2.kt:8020/test/input",
				"hdfs://data2.kt:8020/test/output" };
		int exitcode = ToolRunner.run(new CounterTest(), ars);
		System.exit(exitcode);
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		conf.set("fs.default.name", "hdfs://data2.kt:8020/");
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(args[1]), true);

		Job job = new Job();
		job.setJarByClass(CounterTest.class);

		job.setMapperClass(MyMap.class);
		job.setNumReduceTasks(0);

		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		int result = job.waitForCompletion(true) ? 0 : 1;
		
		//针对Counter结果的显示
		Counters counters = job.getCounters();
		Counter counter1=counters.findCounter(NetTimeLong.OVER30M);
		log.info(counter1.getValue());
		log.info(counter1.getDisplayName()+","+counter1.getName());
		
		return result;
	}

	public static class MyMap extends
			Mapper<LongWritable, Text, NullWritable, Text> {
		private Counter counter1, counter2;

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			double temperature = Double.parseDouble(value.toString());
			if (temperature <= 30) {
				// get时如果不存在就会自动添加
				counter2 = context.getCounter(NetTimeLong.LOW30M);
				counter2.increment(1);
			} else if (temperature > 30) {
				counter1 = context.getCounter(NetTimeLong.OVER30M);
				counter1.increment(1);
			}
			context.write(NullWritable.get(), value);
		}
	}
}

enum NetTimeLong {
	OVER30M, LOW30M
}


目录
相关文章
|
分布式计算 资源调度 Hadoop
Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)
3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序 3.3.2 YARN的重要概念 1、  yarn并不清楚用户提交的程序的运行机制 2、  yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责
2123 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)
|
存储 分布式计算 Hadoop
【Hadoop】一个例子带你了解MapReduce
【Hadoop】一个例子带你了解MapReduce
109 1
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
分布式计算 Hadoop 数据处理
Hadoop基础学习---6、MapReduce框架原理(二)
Hadoop基础学习---6、MapReduce框架原理(二)
|
分布式计算 资源调度 Hadoop
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
|
机器学习/深度学习 分布式计算 监控
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
|
SQL 存储 分布式计算
hadoop之MapReduce
MapReduce
101 0
|
存储 缓存 分布式计算
Hadoop知识点总结——MapReduce的Shuffle
从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程
149 0