1. 数据去重
数据去重主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。
2. 实例描述
对数据文件中的数据进行去重。数据文件中的每行都是一个数据。比如原始输入数据为:
- File1:
2017-3-1 a
2017-3-2 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-7 c
2017-3-3 c - File2:
2017-3-1 b
2017-3-2 a
2017-3-3 b
2017-3-4 d
2017-3-5 a
2017-3-6 c
2017-3-7 d
2017-3-3 c - 输出结果为:
2017-3-1 a
2017-3-1 b
2017-3-2 a
2017-3-2 b
2017-3-3 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-6 c
2017-3-7 c
2017-3-7 d
3. 设计思路
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台 reduce 机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是 reduce 的输入应该以数据作为 key,而对 value-list 则没有要求。当 reduce 接收到一个<key, value-list>
时就直接将 key 复制到输出的 key 中,并将 value 设置成空值。
在 MapReduce 流程中, map 的输出<key,value>
经过 shuffle 过程聚集成<key,value-list>
后会交给 reduce。所以从设计好的 reduce 输入可以反推出 map 的输出 key应为数据, value 任意。继续反推,map 输出数据的 key 为数据,而在这个实例中每个数据代表输入文件中的一行内容,所以 map 阶段要完成的任务就是在采用 Hadoop 默认的作业输入方式之后,将 value 设置为 key,并直接输出(输出中的 value 任意)。 map 中的结果经过 shuffle 过程之后交给 reduce。 reduce 阶段不会管每个 key 有多少个value,它直接将输入的 key 复制为输出的key,并输出就可以了(输出中的 value 被设置成空了)。
4. 程序代码
Map程序
package cn.itcast.hadoop.mr.dedup; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private static Text field = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { field = value; context.write(field, NullWritable.get()); } }
- reduce程序
package cn.itcast.hadoop.mr.dedup; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
- 主程序
package cn.itcast.hadoop.mr.dedup; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DedupRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DedupRunner.class); job.setMapperClass(DedupMapper.class); job.setReducerClass(DedupReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:\\Dedup\\input")); // 指定处理完成之后的结果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("D:\\Dedup\\output")); job.waitForCompletion(true); } }