需求
需求:有大量的文本(文档、网页),需要建立搜索索引
原始数据
a.txt
map reduce MapReduce index Inverted index Inverted index 倒排索引 大数据 hadoop MapReduce hdfs Inverted index 在这里插入代码片
b.txt
hadoop MapReduce hdfs Inverted index 倒排索引 大数据 map reduce MapReduce
c.txt
Inverted index 倒排索引 大数据 hadoop MapReduce hdfs Inverted index hadoop MapReduce hdfs Inverted index map reduce MapReduce
期待的结果:
Inverted b.txt--->1 a.txt--->3 c.txt--->3 MapReduce a.txt--->2 b.txt--->2 c.txt--->3 hadoop a.txt--->1 b.txt--->1 c.txt--->2 hdfs a.txt--->1 b.txt--->1 c.txt--->2 index b.txt--->1 c.txt--->3 a.txt--->4 map a.txt--->1 b.txt--->1 c.txt--->1 reduce a.txt--->1 b.txt--->1 c.txt--->1 倒排索引 a.txt--->1 b.txt--->1 c.txt--->1 大数据 a.txt--->1 b.txt--->1 c.txt--->1
思路
可以使用多job串联,用两个mapreduce任务,
第一次处理预期期望结果
Inverted--a.txt 3 Inverted--b.txt 1 Inverted--c.txt 3 MapReduce--a.txt 2 MapReduce--b.txt 2 MapReduce--c.txt 3 hadoop--a.txt 1 hadoop--b.txt 1 hadoop--c.txt 2 hdfs--a.txt 1 hdfs--b.txt 1 hdfs--c.txt 2 index--a.txt 4 index--b.txt 1 index--c.txt 3 map--a.txt 1 map--b.txt 1 map--c.txt 1 reduce--a.txt 1 reduce--b.txt 1 reduce--c.txt 1 倒排索引--a.txt 1 倒排索引--b.txt 1 倒排索引--c.txt 1 大数据--a.txt 1 大数据--b.txt 1 大数据--c.txt 1
第二次预期输出结果
刚才的最终结果。
码代码
0.封装一个测试类
package com.hfl.driver;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class Drive { /** * 主类 * @param object 主类 * @param mymap map类 * @param mymapkey map输入key * @param mymapvalue map输出value * @param args1 FileInputFormat输入路径 * @param args2 FileOutputFormat输出路径 * @param num reduce个数 * @param args3 加载缓存的路径 * * */ public static void run(Class<?> object,Class<? extends Mapper> mymap,Class<?> mymapkey,Class<?> mymapvalue,int num,String args1,String args2,String args3) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(object); // 3 关联map和reduce job.setMapperClass(mymap); // 4 设置最终输出类型 job.setMapOutputKeyClass(mymapkey); job.setMapOutputValueClass(mymapvalue); //缓存小表的数据 job.addCacheArchive(new URI(args3)); // 设置reducetask个数为0 job.setNumReduceTasks(num); //判断输出路径是否存在 Path path = new Path(args2); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)) { fs.delete(path, true); } // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args1)); FileOutputFormat.setOutputPath(job, new Path(args2)); // 6 提交 job.waitForCompletion(true); } /** * 主类 * @param object 主类 * @param mymap map类 * @param mymapkey map输入key * @param mymapvalue map输出value * @param args1 输入路径 * @param args2 输出路径 * @param myreduce reduce * @param myreducekey reduce-key * @param myreducevalue reduce-value * */ public static void run(Class<?> object, Class<? extends Mapper> mymap, Class<?> mymapkey, Class<?> mymapvalue, Class<? extends Reducer> myreduce, Class<?> myreducekey, Class<?> myreducevalue, String args1, String args2) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(object); // 3 关联map和reduce job.setMapperClass(mymap); // 4 设置最终输出类型 job.setMapOutputKeyClass(mymapkey); job.setMapOutputValueClass(mymapvalue); // 设置reduce job.setReducerClass(myreduce); job.setOutputKeyClass(myreducekey); job.setOutputValueClass(myreducevalue); //判断输出路径是否存在 Path path = new Path(args2); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)) { fs.delete(path, true); } // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args1)); FileOutputFormat.setOutputPath(job, new Path(args2)); // 6 提交 job.waitForCompletion(true); } }
1.job1
package com.hfl.index; import com.hfl.driver.Drive; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; import java.net.URISyntaxException; public class Index1 { public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException { args = new String[]{"F:\\input\\index", "F:\\input\\index1"}; Drive.run(Index1.class,Index1Map.class, Text.class,IntWritable.class,Index1Reduce.class, Text.class, IntWritable.class,args[0],args[1]); } } //map class Index1Map extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(); String name; /** * 初始化获取文件的名字 * */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit =(FileSplit) context.getInputSplit(); name = inputSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 index Inverted index String line = value.toString(); //切分数据 String[] split = line.split(" "); for (String s:split){ k.set(s+"--"+name); v.set(1); context.write(k,v); } } } //word---a.txt (1,1..) class Index1Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //初始化计数器 int count = 0; for (IntWritable iw: values){ count += iw.get(); } v.set(count); context.write(key, v); } }
得到的结果:
2.job2
package com.hfl.index; import com.hfl.driver.Drive; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.net.URISyntaxException; public class Index2 { public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException { args = new String[]{"F:\\input\\index1", "F:\\input\\index2"}; Drive.run(Index2.class,Index2Map.class, Text.class,Text.class,Index2Recuce.class, Text.class, Text.class,args[0],args[1]); } } //map class Index2Map extends Mapper<LongWritable, Text, Text, Text>{ Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split("--"); k.set(split[0]); v.set(split[1]); context.write(k, v); } } //大数据 (c.txt 1, b.txt 1,c.txt 1) class Index2Recuce extends Reducer<Text, Text, Text, Text> { Text k = new Text(); Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text t: values){ String s = t.toString(); String[] split = s.split("\t"); sb.append(split[0] + "--->"+ split[1]+"\t"); } k.set(key); v.set(sb.toString()); context.write(k, v); } }
运行结果:
优化多job串联
写一个main测试类,在一个main里面写2个job
package com.hfl.index; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class IndexMain { public static void main(String[] args) throws IOException, InterruptedException { args = new String[]{"F:\\input\\index","F:\\input\\index1","F:\\input\\index2"}; Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setMapperClass(Index1Map.class); job1.setReducerClass(Index1Reduce.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); //判断输出路径是否存在 Path path = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)) { fs.delete(path, true); } FileInputFormat.setInputPaths(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, path); Job job2 = Job.getInstance(conf); job2.setMapperClass(Index2Map.class); job2.setReducerClass(Index2Recuce.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); //判断输出路径是否存在 Path path1 = new Path(args[2]); if(fs.exists(path1)) { fs.delete(path1, true); } FileInputFormat.setInputPaths(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, path1); //分别创建两个controlledJob对象,处理两个mapreduce程序。 ControlledJob ajob = new ControlledJob(job1.getConfiguration()); ControlledJob bjob = new ControlledJob(job2.getConfiguration()); //创建一个管理组control,用于管理创建的controlledJob对象,自定义组名 JobControl control = new JobControl("hfl"); //两个任务的关联方式 bjob.addDependingJob(ajob); //addJob方法添加进组 control.addJob(ajob); control.addJob(bjob); //设置线程对象来启动job。通过start方法。 Thread thread = new Thread(control); thread.start(); /*往往会出现job线程还在执行,而main线程已经结束。 因此我们需要加上下面这一行代码,通过判断job线程是否执行完毕, 来决定是否退出jvm。通常job线程执行时间较长, 因此我们让当前线程(main线程)在发现job线程没结束的情况下,稍微等他一秒钟 */ while(!control.allFinished()){ Thread.sleep(1000); } System.exit(0); } }
结果和原来一样。
至此,大功告成!