创建MapperTask
创建一个java类继承Mapper父类
接口形参说明
注意数据经过网络传输,所以需要序列化
/** * 注意数据经过网络传输,所以需要序列化 * * KEYIN:默认是一行一行读取的偏移量 long LongWritable * VALUEIN:默认读取的一行的类型 String * * KEYOUT:用户处理完成后返回的数据的key String LongWritable * VALUEOUT:用户处理完成后返回的value integer IntWritable * @author 波波烤鸭 * dengpbs@163.com */ public class MyMapperTask extends Mapper<LongWritable, Text, Text, IntWritable> { /** * Map阶段的业务逻辑写在Map方法中 * 默认是 每读取一行记录就会调用一次该方法 * @param key 读取的偏移量 * @param value 读取的那行数据 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 根据空格切割单词 String[] words = line.split(" "); for (String word : words) { // 将单词作为key 将1作为值 以便于后续的数据分发 context.write(new Text(word), new IntWritable(1)); } } }
创建ReduceTask
创建java类继承自Reducer父类。
/** * KEYIN和VALUEIN 对应的是map阶段的 KEYOUT和VALUEOUT * * KEYOUT: reduce逻辑处理的输出类型 * VALUEOUT: * @author 波波烤鸭 * dengpbs@163.com */ public class MyReducerTask extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * @param key map阶段输出的key * @param values map阶段输出的相同的key对应的数据集 * @param context 上下文 */ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0 ; // 统计同一个key下的单词的个数 for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
创建启动工具类
package com.bobo.mr.wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WcTest { public static void main(String[] args) throws Exception { // 创建配置文件对象 Configuration conf = new Configuration(true); // 获取Job对象 Job job = Job.getInstance(conf); // 设置相关类 job.setJarByClass(WcTest.class); // 指定 Map阶段和Reduce阶段的处理类 job.setMapperClass(MyMapperTask.class); job.setReducerClass(MyReducerTask.class); // 指定Map阶段的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定job的原始文件的输入输出路径 通过参数传入 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务,并等待响应 job.waitForCompletion(true); } }
打包部署
maven打包为jar包
上传测试
在HDFS系统中创建wordcount案例文件夹,并测试
hadoop fs -mkdir -p /hdfs/wordcount/input hadoop fs -put a.txt b.txt /hdfs/wordcount/input/
执行程序测试
hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/
执行成功
[root@hadoop-node01 ~]# hadoop jar hadoop-demo-0.0.1-SNAPSHOT.jar com.bobo.mr.wc.WcTest /hdfs/wordcount/input /hdfs/wordcount/output/ 19/04/03 16:56:43 INFO client.RMProxy: Connecting to ResourceManager at hadoop-node01/192.168.88.61:8032 19/04/03 16:56:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner t o remedy this.19/04/03 16:56:48 INFO input.FileInputFormat: Total input paths to process : 2 19/04/03 16:56:49 INFO mapreduce.JobSubmitter: number of splits:2 19/04/03 16:56:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1554281786018_0001 19/04/03 16:56:52 INFO impl.YarnClientImpl: Submitted application application_1554281786018_0001 19/04/03 16:56:53 INFO mapreduce.Job: The url to track the job: http://hadoop-node01:8088/proxy/application_1554281786018_0001/ 19/04/03 16:56:53 INFO mapreduce.Job: Running job: job_1554281786018_0001 19/04/03 16:57:14 INFO mapreduce.Job: Job job_1554281786018_0001 running in uber mode : false 19/04/03 16:57:14 INFO mapreduce.Job: map 0% reduce 0% 19/04/03 16:57:38 INFO mapreduce.Job: map 100% reduce 0% 19/04/03 16:57:56 INFO mapreduce.Job: map 100% reduce 100% 19/04/03 16:57:57 INFO mapreduce.Job: Job job_1554281786018_0001 completed successfully 19/04/03 16:57:57 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=181 FILE: Number of bytes written=321388 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=325 HDFS: Number of bytes written=87 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=1 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=46511 Total time spent by all reduces in occupied slots (ms)=12763 Total time spent by all map tasks (ms)=46511 Total time spent by all reduce tasks (ms)=12763 Total vcore-milliseconds taken by all map tasks=46511 Total vcore-milliseconds taken by all reduce tasks=12763 Total megabyte-milliseconds taken by all map tasks=47627264 Total megabyte-milliseconds taken by all reduce tasks=13069312 Map-Reduce Framework Map input records=14 Map output records=14 Map output bytes=147 Map output materialized bytes=187 Input split bytes=234 Combine input records=0 Combine output records=0 Reduce input groups=10 Reduce shuffle bytes=187 Reduce input records=14 Reduce output records=10 Spilled Records=28 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=1049 CPU time spent (ms)=5040 Physical memory (bytes) snapshot=343056384 Virtual memory (bytes) snapshot=6182891520 Total committed heap usage (bytes)=251813888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=91 File Output Format Counters Bytes Written=87
查看结果
[root@hadoop-node01 ~]# hadoop fs -cat /hdfs/wordcount/output/part-r-00000 ajax 1 bobo烤鸭 1 hello 2 java 2 mybatis 1 name 1 php 1 shell 2 spring 2 springmvc 1
OK~