工程结构:
在整个案例过程中,代码如下:
WordCountMapper的代码如下:
package cn.toto.bigdata.mr.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
/** * 这里的Mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN :是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long * VALUEIN :是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String * KEYOUT :是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String * VALUEOUT :是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer * * 但是,String,Long等jdk中自带的数据类型,在序列化是,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架, * 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型 * * Long ----> LongWritable * String ----> Text * Integer ----> IntWritable * Null ----> NullWritable */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/** * 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法 * MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次 * 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } } |
WordCountReducer的代码如下:
package cn.toto.bigdata.mr.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/** reducetask在调我们写的reduce方法 reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分 (数据的key.hashcode%reducetask数==本reductask号) reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的: 先将自己收到的所有的kv对按照k分组(根据k是否相同) 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v : values) { count += v.get(); }
context.write(key, new IntWritable(count)); } } |
WordCountDriver的代码如下:
package cn.toto.bigdata.mr.wc;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/** * 本类是客户端用来指定wordcount job程序运行时所需要的很多参数: * 比如,指定用哪个组件作为数据读取器、数据结果输出器 * 指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类 * 指定wordcount job程序的jar包所在路径 * .... * 运行前准备工作 * 1、将当前的工程导出成wordcount.jar * 2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似: * The true nobility is in being superior to your previous self guess 3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中 * * 以及其他各种需要的参数 * hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver * 上面的命令等同: * java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver * * 上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver * * 最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到 */ public class WordCountDriver {
public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop:9000");
/*conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "mini1");*/
Job job = Job.getInstance(conf); //告诉框架,我们的程序所在jar包的路径 // job.setJar("c:/wordcount.jar"); job.setJarByClass(WordCountDriver.class);
//告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class);
//告诉框架,我们的mapperreducer输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
// 告诉框架,我们的数据读取、结果输出所用的format组件 // TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明 FileInputFormat.setInputPaths(job, new Path("/wordcount/input/"));
// 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));深
boolean res = job.waitForCompletion(true);
System.exit(res?0:1); } } |
运行前的准备工作:
运行前准备工作 1、将当前的工程导出成wordcount.jar 2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似: The true nobility is in being superior to your previous self guess 3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中 |
最后,可以执行的命令是: hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver 执行后的效果如下: |
B:使用WordCount本地运行,并且使用Combiner的案例(主要改变是在WordCountDriver中),代码如下:
package cn.toto.bigdata.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 本类是客户端用来指定wordcount job程序运行时所需要的很多参数: * 比如,指定用哪个组件作为数据读取器、数据结果输出器 * 指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类 * 指定wordcount job程序的jar包所在路径 * .... * 运行前准备工作 * 1、将当前的工程导出成wordcount.jar * 2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似: The true nobility is in being superior to your previous self guess 3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中 * * 以及其他各种需要的参数 * hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver * 上面的命令等同: * java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver * * 上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver * * 最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到 */ public class WordCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("fs.defaultFS", "hdfs://hadoop:9000"); /*conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "mini1");*/ Job job = Job.getInstance(conf); //告诉框架,我们的程序所在jar包的路径 // job.setJar("c:/wordcount.jar"); job.setJarByClass(WordCountDriver.class); //告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //使用tCombiner,使用Combiner的好处是让数据在mapper task中就做统计求和,然后将求和后的结果传递给 //reducer,然后reducer可以在进行求和。这样的好处是减少了reducer的工作。让每个mapper task自己做聚合, //通过分担的方式让效率得以提升,由于combiner的内容结构,编程规范也是集成reducer,所以在当前场景中可以将combiner直接 //设置成WordCountReducer job.setCombinerClass(WordCountReducer.class); //告诉框架,我们的mapperreducer输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 告诉框架,我们的数据读取、结果输出所用的format组件 // TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明 FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
准备工作:
在E盘下准备e:/wordcount/input/a.txt,其中的内容如下:
The true nobility is in being superior to your previous self guess No great discovery was ever made without a bold Knowledge will give you power but character respect The sun is just rising in the morning of another day I I figure life is a gift and I don't intend on wasting
右键运行上面的代码,进入:
E:\wordcount\output\part-r-00000中看结果,结果内容如下:
I 3 Knowledge 1 No 1 The 2 a 2 and 1 another 1 being 1 bold 1 but 1 character 1 day 1 discovery 1 don't 1 ever 1 figure 1 gift 1 give 1 great 1 guess 1 in 2 intend 1 is 3 just 1 life 1 made 1 morning 1 nobility 1 of 1 on 1 power 1 previous 1 respect 1 rising 1 self 1 sun 1 superior 1 the 1 to 1 true 1 was 1 wasting 1 will 1 without 1 you 1 your 1
经过上面的所有步骤之后,程序已经编写完成
总结:
3.MAPREDUCE中的Combiner[dht1]
(1)combiner是MR程序中Mapper和Reducer之外的一种组件
(2)combiner组件的父类就是Reducer
(3)combiner和reducer的区别在于运行的位置:
Combiner是在每一个maptask所在的节点运行
Reducer是接收全局所有Mapper的输出结果;
(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
具体实现步骤:
1、 自定义一个combiner继承Reducer,重写reduce方法
2、 在job中设置: job.setCombinerClass(CustomCombiner.class)
(5) combiner能够应用的前提是不能影响最终的业务逻辑
而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来
===============================================================================
流量统计和自定义类实现序列化案例:
package cn.toto.bigdata.mr.wc.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定义的类要被mapreduce使用,需要序列化WritableComparable */ public class FlowBean implements WritableComparable<FlowBean> { private String phoneNbr; private long upFlow; private long dFlow; private long sumFlow; /** * */ public FlowBean() {} /** * 序列化框架在反序列化操作创建对象实例时会调用无参构造 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNbr); out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } /** * 反序列化方法,注意:字段的反序列化顺序与序列化时的顺序保持一致 */ @Override public void readFields(DataInput in) throws IOException { this.phoneNbr = in.readUTF(); this.upFlow = in.readLong(); this.dFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public int compareTo(FlowBean o) { return (int)(o.getSumFlow() - this.sumFlow); } public void set(long upFlow,long dFlow) { this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public void set(String phoneNbr,long upFlow, long dFlow) { this.phoneNbr = phoneNbr; this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public String getPhoneNbr() { return phoneNbr; } public void setPhoneNbr(String phoneNbr) { this.phoneNbr = phoneNbr; } @Override public String toString() { return "FlowBean [phoneNbr=" + phoneNbr + ", upFlow=" + upFlow + ", dFlow=" + dFlow + ", sumFlow=" + sumFlow + "]"; } }
package cn.toto.bigdata.mr.wc.flowsum; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSum { //在kv中传输我们自定义的对象时可以的,但是必须实现hadoop的序列化机制 implements Writable public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将读到的一行数据进行字段切分 String line = value.toString(); String[] fields = StringUtils.split(line,"\t"); //抽取业务所需要的个字段 String phoneNbr = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNbr); v.set(upFlow, dFlow); context.write(k, v); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); /** * reduce方法接收到的key是某一组<a手机号,bean><a手机号,bean><a手机号,bean>中的第一个手机号 * reduce方法接收到的vlaues是这一组kv中的所有bean的一个迭代器 */ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long dFlowCount = 0; for (FlowBean bean : values) { upFlowCount += bean.getUpFlow(); dFlowCount += bean.getdFlow(); } v.set(upFlowCount, dFlowCount); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resourcemanager.hostname", "mini1"); */ Job job = Job.getInstance(conf); // 告诉框架,我们的程序所在jar包的路径 // job.setJar("c:/wordcount.jar"); job.setJarByClass(FlowSum.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); // 告诉框架,我们的mapperreducer输出的数据类型 /* * job.setMapOutputKeyClass(Text.class); * job.setMapOutputValueClass(FlowBean.class); */ // 如果map阶段输出的数据类型跟最终输出的数据类型一致,就只要以下两行代码来指定 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 框架中默认的输入输出组件就是这俩货,所以可以省略这两行代码 /* * job.setInputFormatClass(TextInputFormat.class); * job.setOutputFormatClass(TextOutputFormat.class); */ // 告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/output/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
package cn.toto.bigdata.mr.wc.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 实现流量汇总并且按照流量大小倒序排序 前提:处理的数据是已经汇总过的结果文件 * * @author * */ public class FlowSumSort { public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phoneNbr = fields[0]; long upFlowSum = Long.parseLong(fields[1]); long dFlowSum = Long.parseLong(fields[2]); k.set(upFlowSum, dFlowSum); v.set(phoneNbr); context.write(k, v); } } public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { context.write(phoneNbrs.iterator().next(), bean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 告诉框架,我们要处理的文件在哪个路径下(注意:这里的程序执行) FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/output/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/sortout/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
运行条件模拟:
1、配置环境变量为HADOOP_HOME=E:\learnTempFolder\hadoop-2.7.3
2、从CSDN资源上下载支持win10版本的:E:\learnTempFolder\hadoop-2.7.3\bin\winutils.exe 和 E:\learnTempFolder\hadoop-2.7.3\bin\hadoop.dll
界面效果如下:
3、准备要处理的资料:
HTTP_20130313143750.dat 数据文件的具体内容如:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 2004、先运行FlowSum(右键执行Java程序)
运行生成的文件为E:\learnTempFolder\flow\output\part-r-00000,内容如下:
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 13602846565 1938 2910 4848 13660577991 6960 690 7650 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 13826544101 264 0 264 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 55485、运行FlowSumSort(注意不要删除上面的part-r-00000)
运行后产生的文件内容是:
13502468823 7335 110349 117684 13925057413 11058 48243 59301 13726230503 2481 24681 27162 18320173382 9531 2412 11943 13560439658 2034 5892 7926 13660577991 6960 690 7650 15013685858 3659 3538 7197 13922314466 3008 3720 6728 15920133257 3156 2936 6092 84138413 4116 1432 5548 13602846565 1938 2910 4848 18211575961 1527 2106 3633 15989002119 1938 180 2118 13560436666 1116 954 2070 13926435656 132 1512 1644 13480253104 180 180 360 13826544101 264 0 264 13719199419 240 0 240
当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:
package cn.toto.bigdata.mr.wc.flowsum; import java.io.IOException; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OneStepSumSort { public static class OneStepSumSortMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将读到的一行数据进行字段切分 String line = value.toString(); String[] fields = StringUtils.split(line,"\t"); //抽取业务所需要的各字段 String phoneNbr = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNbr); v.set(phoneNbr, upFlow, dFlow); context.write(k, v); } } public static class OneStepSumSortReducer extends Reducer<Text, FlowBean, Text, FlowBean> { TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean,Text>(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { int upCount = 0; int dCount = 0; for (FlowBean bean : values) { upCount += bean.getUpFlow(); dCount += bean.getdFlow(); } FlowBean sumBean = new FlowBean(); sumBean.set(key.toString(), upCount, dCount); Text text = new Text(key.toString()); treeMap.put(sumBean, text); } @Override protected void cleanup(Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet(); for (Entry<FlowBean, Text> ent : entrySet) { context.write(ent.getValue(), ent.getKey()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OneStepSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(OneStepSumSortMapper.class); job.setReducerClass(OneStepSumSortReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("E:/flow/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("E:/flow/sortout/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
到"E:/flow/sortout/"目录下,查看结果:
即:
13502468823 FlowBean [phoneNbr=13502468823, upFlow=7335, dFlow=110349, sumFlow=117684] 13925057413 FlowBean [phoneNbr=13925057413, upFlow=11058, dFlow=48243, sumFlow=59301] 13726238888 FlowBean [phoneNbr=13726230503, upFlow=2481, dFlow=24681, sumFlow=27162] 18320173382 FlowBean [phoneNbr=18320173382, upFlow=9531, dFlow=2412, sumFlow=11943] 13560439658 FlowBean [phoneNbr=13560439658, upFlow=2034, dFlow=5892, sumFlow=7926] 13660577991 FlowBean [phoneNbr=13660577991, upFlow=6960, dFlow=690, sumFlow=7650] 15013685858 FlowBean [phoneNbr=15013685858, upFlow=3659, dFlow=3538, sumFlow=7197] 13922314466 FlowBean [phoneNbr=13922314466, upFlow=3008, dFlow=3720, sumFlow=6728] 15920133257 FlowBean [phoneNbr=15920133257, upFlow=3156, dFlow=2936, sumFlow=6092] 84138413 FlowBean [phoneNbr=84138413, upFlow=4116, dFlow=1432, sumFlow=5548] 13602846565 FlowBean [phoneNbr=13602846565, upFlow=1938, dFlow=2910, sumFlow=4848] 18211575961 FlowBean [phoneNbr=18211575961, upFlow=1527, dFlow=2106, sumFlow=3633] 15989002119 FlowBean [phoneNbr=15989002119, upFlow=1938, dFlow=180, sumFlow=2118] 13560436666 FlowBean [phoneNbr=13560436666, upFlow=1116, dFlow=954, sumFlow=2070] 13926435656 FlowBean [phoneNbr=13926435656, upFlow=132, dFlow=1512, sumFlow=1644] 13480253104 FlowBean [phoneNbr=13480253104, upFlow=180, dFlow=180, sumFlow=360] 13826544101 FlowBean [phoneNbr=13826544101, upFlow=264, dFlow=0, sumFlow=264] 13926251106 FlowBean [phoneNbr=13719199419, upFlow=240, dFlow=0, sumFlow=240]
6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:
A:下面是自定义分区,自定分区的代码如下:
package cn.toto.bigdata.mr.wc.flowsum; import java.util.HashMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定义分区要继承Partitioner */ public class ProvincePartitioner extends Partitioner<Text, FlowBean>{ private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static { provinceMap.put("138", 0); provinceMap.put("139", 1); provinceMap.put("136", 2); provinceMap.put("137", 3); provinceMap.put("135", 4); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { Integer code = provinceMap.get(key.toString().substring(0,3)); if (code != null) { return code; } return 5; } }B:测试一下自定义分区:
package cn.toto.bigdata.mr.wc.flowsum; import java.io.IOException; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSumProvince { public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将读到的一行数据进行字段切分 String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); // 抽取业务所需要的各字段 String phoneNbr = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNbr); v.set(phoneNbr, upFlow, dFlow); context.write(k, v); } } public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { int upCount = 0; int dCount = 0; for (FlowBean bean : values) { upCount += bean.getUpFlow(); dCount += bean.getdFlow(); } FlowBean sumBean = new FlowBean(); sumBean.set(key.toString(), upCount, dCount); context.write(key, sumBean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumProvince.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumProvinceMapper.class); job.setReducerClass(FlowSumProvinceReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置shuffle的分区组件使用我们自定义的分区组件,按照手机号进行分区,注意在自定义的手机号分区中有5个,所以我们的分区不能少于6个 job.setPartitionerClass(ProvincePartitioner.class); //设置reduce task的数量 job.setNumReduceTasks(6); //告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("E:/flow/input/")); //告诉框架,我们的处理结果要输出到哪里去 Path out = new Path("E:/flow/provinceout/"); FileSystem fs = FileSystem.get(conf); if (fs.exists(out)) { fs.delete(out,true); } FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }C:运行所需的准备:
数据文件:
文件内容如下:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
运行后的结果如下:
part-r-00001中内容:
等等