0.倒排索引资料:
http://blog.csdn.net/pzasdq/article/details/51442856
1.三个日志源文件:
a.txt
hello tom
hello jerry
hello tom
b.txt
hello jerry
hello jerry
tom jerry
c.txt
hello jerry
hello tom
希望统计出来的结果如下:
hello a.txt->3 b.txt->2 c.txt->2 jerry b.txt->3 a.txt->1 c.txt->1 tom a.txt->2 b.txt->1 c.txt->1
2.上代码:
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 14 public class InverseIndex { 15 16 public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{ 17 private Text k = new Text(); 18 private Text v = new Text(); 19 @Override 20 protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context) 21 throws IOException, InterruptedException { 22 String line = value.toString(); 23 String [] words = line.split(" "); 24 FileSplit inputSplit = (FileSplit)context.getInputSplit();//返回mapper读取的是哪个切片split 25 //path=hdfs://itcast:9000/ii/a.txt 26 //k2,v2 为 hello->a.txt {1,1,1} 27 String path = inputSplit.getPath().toString(); 28 for (String word : words) { 29 k.set(word + "->" + path); 30 v.set("1"); 31 context.write(k, v); 32 } 33 } 34 } 35 36 public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{ 37 private Text k = new Text(); 38 private Text v = new Text(); 39 @Override 40 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) 41 throws IOException, InterruptedException { 42 //k2,v2 为hello->a.txt {1,1,1} -----> k3,v3为 hello,a.txt->3 43 int counter = 0; 44 for(Text text :values){ 45 counter += Integer.parseInt(text.toString()); 46 } 47 String[] wordAndPath = key.toString().split("->"); 48 String word = wordAndPath[0]; 49 String path = wordAndPath[1]; 50 k.set(word); 51 v.set(path+"->"+counter); 52 context.write(k,v); 53 } 54 } 55 56 57 public static class IndexReducer extends Reducer<Text, Text, Text, Text>{ 58 private Text v = new Text(); 59 @Override 60 protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) 61 throws IOException, InterruptedException { 62 //Reducer这里 是把所有key相同的搞到一块了,这个地方对应的values为Iterable也证实这一点. 63 //不同的Map根据k2 到达Reducer 把k2相同的汇聚到一起...对应的k2对应的v2组成一个集合. 64 //从combiner过来的k和v为 hello,a.txt->3 经过reducer变成 65 String result = ""; 66 for(Text t:values){ 67 result += t.toString() + "\t"; 68 } 69 v.set(result); 70 context.write(key,v); 71 } 72 } 73 74 public static void main(String[] args) throws Exception { 75 Configuration conf = new Configuration(); 76 Job job = Job.getInstance(conf); 77 job.setJarByClass(InverseIndex.class); 78 79 job.setMapperClass(IndexMapper.class); 80 job.setMapOutputKeyClass(Text.class); 81 job.setMapOutputValueClass(Text.class); 82 83 job.setCombinerClass(IndexCombiner.class); 84 FileInputFormat.setInputPaths(job, new Path(args[0])); 85 86 job.setReducerClass(IndexReducer.class); 87 job.setOutputKeyClass(Text.class); 88 job.setOutputValueClass(Text.class); 89 90 FileOutputFormat.setOutputPath(job, new Path(args[1])); 91 System.exit(job.waitForCompletion(true) ? 0 : 1);//0是正常推出以 1是异常退出. 92 } 93 }
3.打成jar包,通过命令执行
hadoop jar /root/itcastmr.jar itcastmr.inverseindex.InverseIndex /user/root/InverseIndex /InverseIndexResult
查看结果文件:
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/7400391.html,如需转载请自行联系原作者