jai包
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency>
2.x以后就拆成一些零散的包了,没有core包了
代码:
package org.conan.myhadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; //org.apache.hadoop.mapred 老系统的包 //org.apache.hadoop.mapreduce 新系统的包 import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * ModuleMapReduce Class * 单纯的注释 */ public class ModuleMapReduce extends Configured implements Tool { /** * * ModuleMapper Class 不仅有注释的功效而且你鼠标放在你注释的方法上面他会把你注释的内容显示出来, * */ public static class ModuleMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } /** * * ModuleReducer Class * */ public static class ModuleReducer extends Reducer<LongWritable, Text, LongWritable, Text> { @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } @Override protected void reduce(LongWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException { // TODO } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } // Driver 驱动 // @Override //实现接口时关键字1.5和1.7的JDK都会报错,只有1.6不报错 public int run(String[] args) throws Exception { Job job = parseInputAndOutput(this, this.getConf(), args); // 2.set job // step 1:set input job.setInputFormatClass(TextInputFormat.class); // step 3:set mappper class job.setMapperClass(ModuleMapper.class); // step 4:set mapout key/value class job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // step 5:set shuffle(sort,combiner,group) // set sort job.setSortComparatorClass(LongWritable.Comparator.class); // set combiner(optional,default is unset)必须是Reducer的子类 job.setCombinerClass(ModuleReducer.class); // set grouping job.setGroupingComparatorClass(LongWritable.Comparator.class); // step 6 set reducer class job.setReducerClass(ModuleReducer.class); // step 7:set job output key/value class job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // step 8:set output format job.setOutputFormatClass(FileOutputFormat.class); // step 10: submit job Boolean isCompletion = job.waitForCompletion(true);// 提交job return isCompletion ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { // 输入参数的合法性 if (args.length != 2) { System.err.printf( "Usage: %s [generic options] <input> <output> \n ", tool .getClass().getSimpleName()); //%s表示输出字符串,也就是将后面的字符串替换模式中的%s ToolRunner.printGenericCommandUsage(System.err); return null; } // 1.create job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(ModuleMapReduce.class); // step 2:set input path Path inputPath = new Path(args[0]); FileInputFormat.addInputPath(job, inputPath); // step 9:set output path Path outputPath = new Path(args[0]); FileOutputFormat.setOutputPath(job, outputPath); return job; } public static void main(String[] args) { try { int status = ToolRunner.run(new ModuleMapReduce(), args);// 返回值即为isCompletion ? 0 : 1 System.exit(status);// System.exit(0)中断虚拟机的运行,退出应用程序,0表示没有异常正常退出。 } catch (Exception e) { e.printStackTrace(); } } }
倒排索引代码
输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开
要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割
package org.conan.myhadoop.mr; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReverseIndex extends Configured implements Tool { enum Counter { LINESKIP, // 出错的行 } public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 读取源数据 try { // 数据处理 String[] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); // 输出 } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1 return; } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueString; String out = ""; for (Text value : values) { valueString = value.toString(); out += valueString + "|"; System.out.println("Ruduce:key=" + key + " value=" + value); } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf, "ReverseIndex"); // 任务名 job.setJarByClass(ReverseIndex.class); // 指定Class FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码 job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式 job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式 job.waitForCompletion(true); // 输出任务完成情况 System.out.println("任务名称:" + job.getJobName()); System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("输入行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("输出行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { // 判断参数个数是否正确 // 如果无参数运行则显示以作程序说明 if (args.length != 2) { System.err.println(""); System.err .println("Usage: ReverseIndex < input path > < output path > "); System.err .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out"); System.exit(-1); } // 记录开始时间 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 运行任务 int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args); // 输出任务耗时 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任务开始:" + formatter.format(start)); System.out.println("任务结束:" + formatter.format(end)); System.out.println("任务耗时:" + String.valueOf(time) + " 分钟"); System.exit(res); } }
去重代码
//Mapper任务 static class DDMap extends Mapper<LongWritable,Text,Text,Text>{ private static Text line = new Text(); protected void map(LongWritable k1,Text v1,Context context){ line = v1; Text text = new Text(""); try { context.write(line,text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; } //Reducer任务 static class DDReduce extends Reducer<Text,Text,Text,Text>{ protected void reduce(Text k2,Iterable<Text> v2s,Context context){ Text text = new Text(""); try { context.write(k2, text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
参考文章;
一个经典的MapReduce模板代码,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
详解MapReduce实现数据去重与倒排索引应用场景案例
http://www.tuicool.com/articles/emi6Fb
本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1698489