MapReduce编程:数据过滤保存、UID 去重
一、实验目标
- 熟练掌握Mapper类,Reducer类和main函数的编写
- 熟练掌握在本地测试方法
- 熟练掌握集群上进行分布式程序测试
- 掌握用户UID去重实现方法
- 掌握MapReduce数据过滤方法
二、实验要求及注意事项
- 给出每个实验的主要实验步骤、实现代码和测试效果截图。
- 对本次实验工作进行全面的总结分析。
- 所有程序需要本地测试和集群测试,给出相应截图。
- 建议工程名,类名或包名等做适当修改,显示个人学号或者姓名
- 注意:sogou.txt文件上传到HDFS
三、实验内容及步骤
实验任务1:数据过滤及保存,输入文件为搜狗日志文件(公共目录下数据文件/experiment/sogou.txt上传到HDFS的根目录),对输入的每行数据做判断,只把搜索的关键词中包含数字的用户uid和关键词输出到HDFS上。展示结果为成功过滤出搜索关键词包含数字的用户ID和其搜索的关键词,如图所示:
主要实现步骤和运行效果图:
(1)进入虚拟机并启动Hadoop集群,完成文件上传。
(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;
(3)完成代码编写。
Map
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WjwMap extends Mapper<Object, Text, Text, Text>{ protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String [] str = value.toString().split("\t"); if(str != null && str.length == 6){ String keyword = str[2]; String uid = str[1]; if(keyword.matches(".*\\d+.*")){ context.write(new Text(uid), new Text(keyword)); } } } }
Reduce
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WjwReduce extends Reducer< Text, Text, Text, Text>{ protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for (Text val:values){ context.write(key, val); } } }
Main
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WjwMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ if(null == args || args.length != 2){ System.err.println("<Usage>:WjwMain need <input> <output>"); System.exit(1); } String in = args[0]; String out = args[1]; Configuration conf = new Configuration(); Job job = new Job(conf, "WjwMain"); job.setJarByClass(WjwMain.class); job.setMapperClass(WjwMap.class); job.setReducerClass(WjwReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(in)); FileOutputFormat.setOutputPath(job, new Path(out)); System.exit(job.waitForCompletion(true)?0:1); } }
(4)测试程序,并查看输出结果。
实验任务2:使用mapreduce编程,统计sogou.txt文件中所有去重后的用户UID,实现效果如图1所示。2.MapReduce编程:UID去重,完成打印sogou日志中独立uid,展示的结果为sogou.txt文件中所有去重后的用户UID。实现效果如下图所示:
主要实现步骤和运行效果图:
(1)进入虚拟机并启动Hadoop集群,完成文件上传。
(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;
(3)完成代码编写。
UidMap
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WjwUidMap extends Mapper<Object, Text, Text, NullWritable>{ protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String [] arr = value.toString().split("\t"); String uid = arr[1]; context.write(new Text(uid), NullWritable.get()); } }
UidReduce
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WjwUidReduce extends Reducer< Text, NullWritable, Text, NullWritable>{ protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{ context.write(key, NullWritable.get()); } }
UidMain
package com.wjw.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WjwUidMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ if(null == args || args.length != 2){ System.err.println("<Usage>:WjwUidMain need <input> <output>"); System.exit(1); } String in = args[0]; String out = args[1]; Configuration conf = new Configuration(); Job job = new Job(conf, "WjwUidMain"); job.setJarByClass(WjwUidMain.class); job.setMapperClass(WjwUidMap.class); job.setReducerClass(WjwUidReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(in)); FileOutputFormat.setOutputPath(job, new Path(out)); System.exit(job.waitForCompletion(true)?0:1); } }
(4)测试程序,并查看输出结果。
实验任务3:使用Java API编程实现创建一个目录,删除一个目录,读文件和两个写文件的函数,其中一个写文件函数,用于将磁盘文件内容写入到HDFS一个自定义文件中,另一个写文件函数,将HDFS中一个目录下的文件内容写入到HDFS中另一个目录文件中。文件名字自拟。要求将所有函数封装到同一个类中,在主函数中调用,进行测试。
主要实现步骤和运行效果图:
(1)进入虚拟机并启动Hadoop集群,完成文件上传。
(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;
(3)完成代码编写。
Main
package hadoop; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class WjwFile { public static void Read(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://master:9000"); FileSystem fs = null; FSDataInputStream in =null; fs = FileSystem.get(conf); in = fs.open(new Path(args[2])); IOUtils.copyBytes(in, System.out, 4096, false); } public static void Write1(String[] args) throws IOException { Configuration conf = new Configuration(); BufferedInputStream in = null; FileSystem fs = null; FSDataOutputStream out = null; in = new BufferedInputStream(new FileInputStream(args[1])); fs = FileSystem.get(URI.create(args[2]), conf); out = fs.create(new Path(args[2])); IOUtils.copyBytes(in, out, 4096, false); } public static void Write2(String[] args) throws IOException { Configuration conf = new Configuration(); BufferedInputStream in = null; FileSystem fs = null; FSDataOutputStream out = null; Path path = new Path(args[2]); fs = FileSystem.get(URI.create(args[3]), conf); in = new BufferedInputStream(fs.open(path)); out = fs.create(new Path(args[3])); IOUtils.copyBytes(in, out, 4096, false); } public static void Mkdir(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = null; fs = FileSystem.get(URI.create(args[0]), conf); fs.mkdirs(new Path(args[0])); } public static void Delete(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = null; fs = FileSystem.get(URI.create(args[0]), conf); fs.delete(new Path(args[0]), true); } public static void main(String[] args) throws IOException{ Mkdir(args); System.out.println("Directory created."); Write1(args); System.out.println("File1 written."); Read(args); System.out.println("File read."); Write2(args); System.out.println("File2 written."); Delete(args); System.out.println("Directory deleted."); } }
(4)测试程序,并查看输出结果。