1.程序需要的材料
文件中各个字段的含义,其中第6,7,8,9是要统计的流量相关的字段.
文件内容:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
二.程序:
1 package mapreducejob; 2 3 /** 4 * 老师给的元数据信息如下: 5 * 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 6 * 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 7 * 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 8 * 第六个字段是上行数据包数. 9 * 第七个字段是下行数据包数 10 * 第八个是上行总流量 11 * 第九个是下行总流量 12 */ 13 14 import java.io.DataInput; 15 import java.io.DataOutput; 16 import java.io.IOException; 17 18 import org.apache.hadoop.conf.Configuration; 19 import org.apache.hadoop.fs.Path; 20 import org.apache.hadoop.io.LongWritable; 21 import org.apache.hadoop.io.Text; 22 import org.apache.hadoop.io.Writable; 23 import org.apache.hadoop.mapreduce.Job; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.mapreduce.Reducer; 26 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 28 29 public class TrafficApp { 30 31 public static void main(String[] args) throws Exception { 32 Job job = Job.getInstance(new Configuration(), 33 TrafficApp.class.getSimpleName()); 34 job.setJarByClass(TrafficApp.class);// 通过jar包运行. 35 36 FileInputFormat.setInputPaths(job, args[0]);// 数据输入,指定数据源 37 38 job.setMapperClass(MyMapper.class);// 给job设置map 39 job.setMapOutputKeyClass(Text.class); 40 job.setMapOutputValueClass(TrafficWritable.class); 41 42 job.setReducerClass(MyReducer.class); 43 job.setOutputKeyClass(Text.class); 44 job.setOutputValueClass(TrafficWritable.class); 45 46 FileOutputFormat.setOutputPath(job, new Path(args[1])); 47 48 job.waitForCompletion(true);// 在集群中运行 49 } 50 51 public static class MyMapper extends 52 Mapper<LongWritable, Text, Text, TrafficWritable> { 53 // 这四个参数分别是<k1,v1>和<k2,v2> 54 // k1代表的是字节的偏移量,v1是原始数据. k2是手机号.v2是每一次的通话流量 55 Text k2 = new Text();// new 一个作为k2手机号. 56 TrafficWritable v2 = new TrafficWritable();// new 一个作为v2 57 58 @Override 59 protected void map( 60 LongWritable key, 61 Text value, 62 Mapper<LongWritable, Text, Text, TrafficWritable>.Context context) 63 throws IOException, InterruptedException { 64 String line = value.toString(); 65 String[] splited = line.split("\t");// 以制表符作为拆分符得到一个字节数组. 66 // 通过原始数据文件可以看到这个里面有11个字段,所以这个拆分的数组长度为11 67 k2.set(splited[1]);// k2是手机号,在这个数组中是第二个. 68 v2.set(splited[6], splited[7], splited[8], splited[9]);// v2是代表四个流量 69 // 对应这个被拆分数组的第6,7,8,9个. 70 context.write(k2, v2); 71 } 72 73 } 74 75 public static class MyReducer extends 76 Reducer<Text, TrafficWritable, Text, TrafficWritable> { 77 // 四个参数分别是<k2,v2> <k3,v3> 78 // k2是手机号,v2是流量TrafficWritable k3是手机号,v3是流量汇总. 79 TrafficWritable v3 = new TrafficWritable(); 80 81 @Override 82 protected void reduce( 83 Text k2, 84 Iterable<TrafficWritable> v2s, 85 Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context) 86 throws IOException, InterruptedException { 87 // reduce方法的第一个形参是k2,第二个形参是v2s,第三个形参是一个context上下文 88 // v2s是流量集合.我们在reduce方法中要做的就是把v2汇总起来变成v3. 89 long t1 = 0L; 90 long t2 = 0L; 91 long t3 = 0L; 92 long t4 = 0L; 93 for (TrafficWritable v2 : v2s) { 94 t1 += v2.t1; 95 t2 += v2.t2; 96 t3 += v2.t3; 97 t4 += v2.t4; 98 } 99 v3.set(t1, t2, t3, t4);//构造v3 100 context.write(k2, v3); 101 } 102 103 } 104 105 /** 106 * 针对流量设置一个流量类. 第六个字段是上行数据包数. 第七个字段是下行数据包数. 第八个是上行总流量. 第九个是下行总流量 107 * 108 */ 109 static class TrafficWritable implements Writable { 110 // 这个类是流量统计类,这个类包含了该手机号的上传和下载的流量 111 // 在MapReduce中的键值对中代表的是v3,有四列组成. 112 long t1; 113 long t2; 114 long t3; 115 long t4; 116 117 // 再搞一个无产的构造函数,否则容易出错 118 public TrafficWritable() { 119 } 120 121 public void set(long t1, long t2, long t3, long t4) { 122 // 赋值的方法,这个地方是传入的long类型. 123 this.t1 = t1; 124 this.t2 = t2; 125 this.t3 = t3; 126 this.t4 = t4; 127 } 128 129 public void set(String t1, String t2, String t3, String t4) { 130 // 赋值的方法,这个地方是传入的String类型. 131 this.t1 = Long.parseLong(t1); 132 this.t2 = Long.parseLong(t2); 133 this.t3 = Long.parseLong(t3); 134 this.t4 = Long.parseLong(t4); 135 } 136 137 public void readFields(DataInput in) throws IOException { 138 // 四列都通过in.readLong()读进来. 139 this.t1 = in.readLong(); 140 this.t2 = in.readLong(); 141 this.t3 = in.readLong(); 142 this.t4 = in.readLong(); 143 } 144 145 public void write(DataOutput out) throws IOException { 146 // 这个对象有四列,必须要把四列都给写出去. 147 out.writeLong(t1); 148 out.writeLong(t2); 149 out.writeLong(t3); 150 out.writeLong(t4); 151 } 152 153 public String toString() { 154 // 在Reduce阶段会用到这个方法,否则输出的是哈希编码 155 return this.t1 + "\t" + this.t2 + "\t" + this.t3 + "\t" + this.t4; 156 } 157 } 158 159 }
//===============================================================
代码二:
1 package mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Partitioner; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 19 public class TrafficApp { 20 public static void main(String[] args) throws Exception { 21 Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName()); 22 job.setJarByClass(TrafficApp.class); 23 24 FileInputFormat.setInputPaths(job, args[0]); 25 26 job.setMapperClass(TrafficMapper.class); 27 job.setMapOutputKeyClass(Text.class); 28 job.setMapOutputValueClass(TrafficWritable.class); 29 30 job.setNumReduceTasks(2);//设定Reduce的数量为2 31 job.setPartitionerClass(TrafficPartitioner.class);//设定一个Partitioner的类. 32 /* 33 *Partitioner是如何实现不同的Map输出分配到不同的Reduce中? 34 *在不适用指定的Partitioner时,有 一个默认的Partitioner. 35 *就是HashPartitioner. 36 *其只有一行代码,其意思就是过来的key,不管是什么,模numberReduceTasks之后 返回值就是reduce任务的编号. 37 *numberReduceTasks的默认值是1. 任何一个数模1(取余数)都是0. 38 *这个地方0就是取编号为0的Reduce.(Reduce从0开始编号.) 39 */ 40 41 job.setReducerClass(TrafficReducer.class); 42 job.setOutputKeyClass(Text.class); 43 job.setOutputValueClass(TrafficWritable.class); 44 45 FileOutputFormat.setOutputPath(job, new Path(args[1])); 46 job.waitForCompletion(true); 47 } 48 49 public static class TrafficPartitioner extends Partitioner<Text,TrafficWritable>{//k2,v2 50 51 @Override 52 public int getPartition(Text key, TrafficWritable value,int numPartitions) { 53 long phoneNumber = Long.parseLong(key.toString()); 54 return (int)(phoneNumber%numPartitions); 55 } 56 57 } 58 59 60 /** 61 * 第一个参数是LongWritable类型是文本一行数据开头的字节数 62 * 第二个参数是文本中的一行数据 Text类型 63 * 第三个参数是要输出的手机号 Text类型 64 * 第四个参数是需要我们自定义的流量类型TrafficWritable 65 * @author ABC 66 * 67 */ 68 public static class TrafficMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{ 69 Text k2 = new Text(); 70 TrafficWritable v2 = null; 71 @Override 72 protected void map(LongWritable key,Text value, Mapper<LongWritable, Text, Text, TrafficWritable>.Context context) 73 throws IOException, InterruptedException { 74 String line = value.toString(); 75 String[] splited = line.split("\t"); 76 77 k2.set(splited[1]);//这个值对应的是手机号码 78 v2 = new TrafficWritable(splited[6], splited[7], splited[8], splited[9]); 79 context.write(k2, v2); 80 } 81 82 } 83 84 public static class TrafficReducer extends Reducer <Text, TrafficWritable, Text, TrafficWritable>{ 85 @Override 86 protected void reduce(Text k2,Iterable<TrafficWritable> v2s, 87 Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context) 88 throws IOException, InterruptedException { 89 //遍历v2s 流量都这个集合里面 90 long t1 = 0L; 91 long t2 = 0L; 92 long t3 = 0L; 93 long t4 = 0L; 94 95 for (TrafficWritable v2 : v2s) { 96 t1 += v2.getT1(); 97 t2 += v2.getT2(); 98 t3 += v2.getT3(); 99 t4 += v2.getT4(); 100 } 101 TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4); 102 context.write(k2, v3); 103 } 104 } 105 106 public static class TrafficWritable implements Writable{ 107 private long t1; 108 private long t2; 109 private long t3; 110 private long t4; 111 //写两个构造方法,一个是有参数的构造方法,一个是无参数的构造方法. 112 //必须要有 一个无参数的构造方法,否则程序运行会报错. 113 114 public TrafficWritable(){ 115 super(); 116 } 117 118 public TrafficWritable(long t1, long t2, long t3, long t4) { 119 super(); 120 this.t1 = t1; 121 this.t2 = t2; 122 this.t3 = t3; 123 this.t4 = t4; 124 } 125 //在程序中读取文本穿过来的都是字符串,所以再搞一个字符串类型的构造方法 126 public TrafficWritable(String t1, String t2, String t3, String t4) { 127 super(); 128 this.t1 = Long.parseLong(t1); 129 this.t2 = Long.parseLong(t2); 130 this.t3 = Long.parseLong(t3); 131 this.t4 = Long.parseLong(t4); 132 } 133 134 public void write(DataOutput out) throws IOException { 135 //对各个成员变量进行序列化 136 out.writeLong(t1); 137 out.writeLong(t2); 138 out.writeLong(t3); 139 out.writeLong(t4); 140 } 141 142 public void readFields(DataInput in) throws IOException { 143 //对成员变量进行反序列化 144 this.t1 = in.readLong(); 145 this.t2 = in.readLong(); 146 this.t3 = in.readLong(); 147 this.t4 = in.readLong(); 148 } 149 150 public long getT1() { 151 return t1; 152 } 153 154 public void setT1(long t1) { 155 this.t1 = t1; 156 } 157 158 public long getT2() { 159 return t2; 160 } 161 162 public void setT2(long t2) { 163 this.t2 = t2; 164 } 165 166 public long getT3() { 167 return t3; 168 } 169 170 public void setT3(long t3) { 171 this.t3 = t3; 172 } 173 174 public long getT4() { 175 return t4; 176 } 177 178 public void setT4(long t4) { 179 this.t4 = t4; 180 } 181 182 @Override 183 public String toString() { 184 return t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 ; 185 } 186 187 } 188 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6260491.html,如需转载请自行联系原作者