MapReduce编程:自定义分区和自定义计数器
一、实验目标
- 熟练掌握Mapper类,Reducer类和main函数的编写方法
- 掌握用mapreduce计算框架中本地聚合的基本用法
- 掌握MapReduce编程的自定义分区操作
- 掌握用mapreduce计算框架中partitioner的基本用法和实现效果
- 掌握MapReduce编程的自定义计数器操作
二、实验要求及注意事项
- 给出每个实验的主要实验步骤、实现代码和测试效果截图。
- 对本次实验工作进行全面的总结分析。
- 所有程序需要本地测试和集群测试,给出相应截图。
- 建议工程名,类名或包名等做适当修改,显示个人学号或者姓名
三、实验内容及步骤
实验任务1:自定义分区,mapreduce计算框架中partitioner可以将需要归为一类的数据输出到同一个文件中。使用mapreduce编程,读取文本文件,对其中的单词进行计数,并设置2个分区,将单词首字母包含“h”的放入第一个分区,其余的放入第二个分区。实现效果参考图1:
主要实现步骤和运行效果图:
(1)进入虚拟机并启动Hadoop集群,完成文件上传。
(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;
(3)完成代码编写。
PartitionMap
package hadoop; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import org.apache.hadoop.io.*; public class WjwPartitionMap extends Mapper<Object, Text, Text, IntWritable>{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String arr[] = value.toString().split(" "); for(String word:arr){ context.write(new Text(word), new IntWritable(1)); } } }
PartitionReduce
package hadoop; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; public class WjwPartitionReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable val:values){ sum += val.get(); } context.write(key, new IntWritable(sum)); } }
Partitioner
package hadoop; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; public class WjwPartitioner extends Partitioner<Text, IntWritable>{ public int getPartition(Text key, IntWritable value, int numPartition){ String str = key.toString(); if(str.startsWith("h")){ return 0; }else{ return 1; } } }
PartitionMain
package hadoop; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; public class WjwPartitionMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ if(args==null || args.length!=2){ System.out.println("error"); } Job job = Job.getInstance(new Configuration(), "WjwPartitionMain"); job.setJarByClass(WjwPartitionMain.class); job.setMapperClass(WjwPartitionMap.class); job.setReducerClass(WjwPartitionReduce.class); job.setNumReduceTasks(2); job.setPartitionerClass(WjwPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
(4)测试程序,并查看输出结果。
实验任务2:自定义计数器,使用mapreduce编程,并在编程中利用自定义计数器统计sogou.txt文件中符合指定条件的uid等于”88f8ec0baee999cb1f30ffecda27cd86”出现的次数,实现效果如图2所示。
主要实现步骤和运行效果图:
(1)进入虚拟机并启动Hadoop集群,完成文件上传。
(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;
(3)完成代码编写。
CountMap
package hadoop; import org.apache.hadoop.mapreduce.*; import java.io.*; import org.apache.hadoop.io.*; public class WjwCountMap extends Mapper<Object, Text, Text, IntWritable>{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String arr[] = value.toString().split("\t"); String uid = arr[1]; Counter mycounter = context.getCounter("zidingyi","uid-counter"); if("88f8ec0baee999cb1f30ffecda27cd86".equals(uid)){ mycounter.increment(1); context.write(new Text(uid), new IntWritable(1)); } } }
CountReduce
package hadoop; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; public class WjwCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @SuppressWarnings("unused") protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable val:values){ sum += 1; } context.write(key, new IntWritable(sum)); } }
CountMain
package hadoop; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; public class WjwCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ if(args==null || args.length!=2){ System.out.println("error"); } Job job = Job.getInstance(new Configuration(), "WjwCountMain"); job.setJarByClass(WjwCountMain.class); job.setMapperClass(WjwCountMap.class); job.setReducerClass(WjwCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
(4)测试程序,并查看输出结果。
附:系列文章
实验 | 文章目录 | 直达链接 |
实验01 | Hadoop安装部署 | https://want595.blog.csdn.net/article/details/132767284 |
实验02 | HDFS常用shell命令 | https://want595.blog.csdn.net/article/details/132863345 |
实验03 | Hadoop读取文件 | https://want595.blog.csdn.net/article/details/132912077 |
实验04 | HDFS文件创建与写入 | https://want595.blog.csdn.net/article/details/133168180 |
实验05 | HDFS目录与文件的创建删除与查询操作 | https://want595.blog.csdn.net/article/details/133168734 |
实验06 | SequenceFile、元数据操作与MapReduce单词计数 | https://want595.blog.csdn.net/article/details/133926246 |
实验07 | MapReduce编程:数据过滤保存、UID 去重 | https://want595.blog.csdn.net/article/details/133947981 |
实验08 | MapReduce 编程:检索特定群体搜索记录和定义分片操作 | https://want595.blog.csdn.net/article/details/133948849 |
实验09 | MapReduce 编程:join操作和聚合操作 | https://want595.blog.csdn.net/article/details/133949148 |
实验10 | MapReduce编程:自定义分区和自定义计数器 | https://want595.blog.csdn.net/article/details/133949522 |