一:序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
二:Hadoop序列化的特点
(1):序列化格式特点:
紧凑:高效使用存储空间。
快速:读写数据的额外开销小。
可扩展:可透明地读取老格式的数据。
互操作:支持多语言的交互。(2):Hadoop的序列化格式:Writable接口
三:Hadoop序列化的作用:
(1):序列化在分布式环境的两大作用:进程间通信,永久存储。
(2):Hadoop节点间通信。
四:Writable接口(实现序列化的类实现这个接口)
(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
(2)MapReduce的任意Key和Value必须实现Writable接口.
(3)MapReduce的任意key必须实现WritableComparable接口.
1:创建一个FlowBean的实体类,实现序列化操作:
1 package com.flowSum; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 9 /*** 10 * 11 * @author Administrator 12 * 1:write 是把每个对象序列化到输出流 13 * 2:readFields是把输入流字节反序列化 14 * 3:实现WritableComparable 15 * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 16 * 17 */ 18 public class FlowBean implements Writable{ 19 20 private String phoneNumber;//电话号码 21 private long upFlow;//上行流量 22 private long downFlow;//下行流量 23 private long sumFlow;//总流量 24 25 26 27 public String getPhoneNumber() { 28 return phoneNumber; 29 } 30 public void setPhoneNumber(String phoneNumber) { 31 this.phoneNumber = phoneNumber; 32 } 33 public long getUpFlow() { 34 return upFlow; 35 } 36 public void setUpFlow(long upFlow) { 37 this.upFlow = upFlow; 38 } 39 public long getDownFlow() { 40 return downFlow; 41 } 42 public void setDownFlow(long downFlow) { 43 this.downFlow = downFlow; 44 } 45 public long getSumFlow() { 46 return sumFlow; 47 } 48 public void setSumFlow(long sumFlow) { 49 this.sumFlow = sumFlow; 50 } 51 52 //为了对象数据的初始化方便,加入一个带参的构造函数 53 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 54 this.phoneNumber = phoneNumber; 55 this.upFlow = upFlow; 56 this.downFlow = downFlow; 57 this.sumFlow = upFlow + downFlow; 58 } 59 //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数 60 public FlowBean() { 61 } 62 63 //重写toString()方法 64 @Override 65 public String toString() { 66 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 67 } 68 69 70 //从数据流中反序列出对象的数据 71 //从数据流中读取字段时必须和序列化的顺序保持一致 72 @Override 73 public void readFields(DataInput in) throws IOException { 74 phoneNumber = in.readUTF(); 75 upFlow = in.readLong(); 76 downFlow = in.readLong(); 77 sumFlow = in.readLong(); 78 79 } 80 81 //将对象数据序列化到流中 82 @Override 83 public void write(DataOutput out) throws IOException { 84 out.writeUTF(phoneNumber); 85 out.writeLong(upFlow); 86 out.writeLong(downFlow); 87 out.writeLong(sumFlow); 88 89 } 90 91 92 }
创建FlowSumMapper的类实现Mapper这个类:
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化 13 * 所以就必须实现hadoop的相应的序列化接口 14 * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量 19 //封装成key-value发送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行数据 25 String line = value.toString(); 26 //切分成各个字段 27 String[] fields = StringUtils.split(line,"/t"); 28 //拿到手机号的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封装数据为key-value进行输出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }
创建FlowSumReducer类继承Reducer类:
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Reducer; 7 8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ 9 10 //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法 11 //reduce中的业务逻辑就是遍历values,然后累加求和再输出 12 @Override 13 protected void reduce(Text key, Iterable<FlowBean> values, Context context) 14 throws IOException, InterruptedException { 15 //上行流量计数器和下行流量计数器 16 long up_flow_counter = 0; 17 long down_flow_counter = 0; 18 19 //上行流量和下行流量累加求和 20 for(FlowBean bean : values){ 21 up_flow_counter += bean.getUpFlow(); 22 down_flow_counter += bean.getDownFlow(); 23 } 24 25 //将结果输出 26 context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter)); 27 28 } 29 30 }
创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):
1 package com.flowSum; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 import org.apache.hadoop.util.Tool; 10 import org.apache.hadoop.util.ToolRunner; 11 12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 13 /*** 14 * 15 * @author Administrator 16 * 1:Job描述和提交类的规范写法 17 */ 18 public class FlowSumRunner extends Configured implements Tool{ 19 20 21 @Override 22 public int run(String[] args) throws Exception { 23 //创建配置文件 24 Configuration conf = new Configuration(); 25 //获取一个作业 26 Job job = Job.getInstance(conf); 27 28 //设置整个job所用的那些类在哪个jar包 29 job.setJarByClass(FlowSumRunner.class); 30 31 //本job使用的mapper和reducer的类 32 job.setMapperClass(FlowSumMapper.class); 33 job.setReducerClass(FlowSumReducer.class); 34 35 //指定mapper的输出数据key-value类型 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(FlowBean.class); 38 39 //指定reduce的输出数据key-value类型 40 job.setOutputKeyClass(Text.class); 41 job.setOutputValueClass(FlowBean.class); 42 43 //指定要处理的输入数据存放路径 44 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类, 45 //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。 46 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 47 FileInputFormat.setInputPaths(job, new Path(args[0])); 48 49 //指定处理结果的输出数据存放路径 50 FileOutputFormat.setOutputPath(job, new Path(args[1])); 51 52 //将job提交给集群运行 53 //job.waitForCompletion(true); 54 //正常执行成功返回0,否则返回1 55 return job.waitForCompletion(true) ? 0 : 1; 56 } 57 58 public static void main(String[] args) throws Exception { 59 //规范性调用 60 int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); 61 //执行结束退出 62 System.exit(res); 63 } 64 65 }
然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
可以看到打的包和模拟数据已经上传到虚拟机上:
然后将数据上传到hdfs集群(这里是伪分布式集群)上面:
现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:
然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:
然后就报了一个这样子的错,我也是一脸懵逼:
Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
at java.lang.Class.asSubclass(Class.java:3165)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:
原来是Text的包导错了(还是小心点好。不然够喝一壶的了)
不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
而是:import org.apache.hadoop.io.Text;
然后打包上传到虚拟机上面运行,然后你会发现这个错误:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job10.run(Job.java:1285)atorg.apache.hadoop.mapreduce.Job10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:
最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):
然后就报数据越界的异常,我想可能是测试数据不干净:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162
然后手动造了一份数据,如下所示:
(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)
1363157985066 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 241 200 1363157985061 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985062 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 681 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 4681 200 1363157985064 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 5481 4681 200 1363157985065 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 6481 2681 200 1363157985066 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 7481 2481 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 8481 2461 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 281 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 2681 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 24681 200 1363157985069 13726230509 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 681 200 1363157985060 13726230500 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 24681 200 1363157985061 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 81 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985063 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985064 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 2681 200 1363157985065 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 81 24681 200 1363157985067 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 241 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 681 200 1363157985068 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 241 681 200
最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");
(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化 13 * 所以就必须实现hadoop的相应的序列化接口 14 * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量 19 //封装成key-value发送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行数据 25 String line = value.toString(); 26 //切分成各个字段 27 String[] fields = StringUtils.split(line, " "); 28 //拿到手机号的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封装数据为key-value进行输出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }
打包上传到虚拟机上面,然后运行(正常运行结果如下所示):
[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job: map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job: map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1179
FILE: Number of bytes written=187971
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2467
HDFS: Number of bytes written=279
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2691
Total time spent by all reduces in occupied slots (ms)=2582
Total time spent by all map tasks (ms)=2691
Total time spent by all reduce tasks (ms)=2582
Total vcore-seconds taken by all map tasks=2691
Total vcore-seconds taken by all reduce tasks=2582
Total megabyte-seconds taken by all map tasks=2755584
Total megabyte-seconds taken by all reduce tasks=2643968
Map-Reduce Framework
Map input records=23
Map output records=23
Map output bytes=1127
Map output materialized bytes=1179
Input split bytes=93
Combine input records=0
Combine output records=0
Reduce input groups=10
Reduce shuffle bytes=1179
Reduce input records=23
Reduce output records=10
Spilled Records=46
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=126
CPU time spent (ms)=1240
Physical memory (bytes) snapshot=218099712
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2374
File Output Format Counters
Bytes Written=279
[root@master hadoop]#
查看输出结果如下所示:
总之吧,学习新知识,难免各种错误,静下心去解决吧。
2:流量求和统计排序案例实践:
将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);
1 package com.flowSort; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 18 public class FlowSortMapReduce { 19 20 /*** 21 * mapper静态内部类 22 * @author Administrator 23 * 24 */ 25 public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ 26 27 //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出 28 @Override 29 protected void map(LongWritable key, Text value,Context context) 30 throws IOException, InterruptedException { 31 //获取到一行数据 32 String line = value.toString(); 33 //对这一行数据进行截取 34 String[] fields = StringUtils.split(line, ""); 35 36 //获取数据里面的数据 37 String phoneNumber = fields[0]; 38 long up_flow = Long.parseLong(fields[1]); 39 long down_flow = Long.parseLong(fields[2]); 40 41 //将数据进行封装传递给reduce 42 context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get()); 43 } 44 45 } 46 47 /*** 48 * reducer的静态内部类 49 * @author Administrator 50 * 51 */ 52 public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ 53 54 @Override 55 protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context) 56 throws IOException, InterruptedException { 57 58 String phoneNumber = key.getPhoneNumber(); 59 context.write(new Text(phoneNumber), key); 60 } 61 } 62 63 64 /*** 65 * 主方法 66 * @param args 67 * @throws InterruptedException 68 * @throws IOException 69 * @throws ClassNotFoundException 70 */ 71 public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { 72 //创建配置文件 73 Configuration conf = new Configuration(); 74 //获取一个作业 75 Job job = Job.getInstance(conf); 76 77 //设置整个job所用的那些类在哪个jar包 78 job.setJarByClass(FlowSortMapReduce.class); 79 80 //本job使用的mapper和reducer的类 81 job.setMapperClass(FlowSortMapper.class); 82 job.setReducerClass(FlowSortReducer.class); 83 84 //指定mapper的输出数据key-value类型 85 job.setMapOutputKeyClass(FlowBean.class); 86 job.setMapOutputValueClass(NullWritable.class); 87 88 //指定reduce的输出数据key-value类型Text 89 job.setOutputKeyClass(Text.class); 90 job.setOutputValueClass(FlowBean.class); 91 92 //指定要处理的输入数据存放路径 93 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类, 94 //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。 95 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 96 FileInputFormat.setInputPaths(job, new Path(args[0])); 97 98 //指定处理结果的输出数据存放路径 99 FileOutputFormat.setOutputPath(job, new Path(args[1])); 100 101 //将job提交给集群运行 102 //job.waitForCompletion(true); 103 //正常执行成功返回0,否则返回1 104 System.exit(job.waitForCompletion(true) ? 0 : 1); 105 } 106 107 }
实体类改造,进行总流量排序处理:
1 package com.flowSort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 import org.apache.hadoop.io.WritableComparable; 9 10 /*** 11 * 12 * @author Administrator 13 * 1:write 是把每个对象序列化到输出流 14 * 2:readFields是把输入流字节反序列化 15 * 3:实现WritableComparable 16 * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 17 * 18 */ 19 public class FlowBean implements WritableComparable<FlowBean>{ 20 21 22 private String phoneNumber;//电话号码 23 private long upFlow;//上行流量 24 private long downFlow;//下行流量 25 private long sumFlow;//总流量 26 27 28 29 public String getPhoneNumber() { 30 return phoneNumber; 31 } 32 public void setPhoneNumber(String phoneNumber) { 33 this.phoneNumber = phoneNumber; 34 } 35 public long getUpFlow() { 36 return upFlow; 37 } 38 public void setUpFlow(long upFlow) { 39 this.upFlow = upFlow; 40 } 41 public long getDownFlow() { 42 return downFlow; 43 } 44 public void setDownFlow(long downFlow) { 45 this.downFlow = downFlow; 46 } 47 public long getSumFlow() { 48 return sumFlow; 49 } 50 public void setSumFlow(long sumFlow) { 51 this.sumFlow = sumFlow; 52 } 53 54 //为了对象数据的初始化方便,加入一个带参的构造函数 55 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 56 this.phoneNumber = phoneNumber; 57 this.upFlow = upFlow; 58 this.downFlow = downFlow; 59 this.sumFlow = upFlow + downFlow; 60 } 61 //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数 62 public FlowBean() { 63 } 64 65 //重写toString()方法 66 @Override 67 public String toString() { 68 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 69 } 70 71 72 //从数据流中反序列出对象的数据 73 //从数据流中读取字段时必须和序列化的顺序保持一致 74 @Override 75 public void readFields(DataInput in) throws IOException { 76 phoneNumber = in.readUTF(); 77 upFlow = in.readLong(); 78 downFlow = in.readLong(); 79 sumFlow = in.readLong(); 80 81 } 82 83 //将对象数据序列化到流中 84 @Override 85 public void write(DataOutput out) throws IOException { 86 out.writeUTF(phoneNumber); 87 out.writeLong(upFlow); 88 out.writeLong(downFlow); 89 out.writeLong(sumFlow); 90 91 } 92 93 //流量比较的实现方法 94 @Override 95 public int compareTo(FlowBean o) { 96 97 //大就返回-1,小于等于返回1,进行倒序排序 98 return sumFlow > o.sumFlow ? -1 : 1; 99 } 100 101 102 103 }
效果就是这样,总之问题不断:
[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job: map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job: map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=822
FILE: Number of bytes written=187379
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=635
HDFS: Number of bytes written=526
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2031
Total time spent by all reduces in occupied slots (ms)=2599
Total time spent by all map tasks (ms)=2031
Total time spent by all reduce tasks (ms)=2599
Total vcore-seconds taken by all map tasks=2031
Total vcore-seconds taken by all reduce tasks=2599
Total megabyte-seconds taken by all map tasks=2079744
Total megabyte-seconds taken by all reduce tasks=2661376
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=109
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=121
CPU time spent (ms)=700
Physical memory (bytes) snapshot=218284032
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=526
File Output Format Counters
Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r-- 1 root supergroup 0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r-- 1 root supergroup 526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888 2481 24681 27162
13726230503 2481 24681 27162
13925057413 63 11058 11121
18320173382 18 9531 9549
13502468823 102 7335 7437
13660577991 9 6960 6969
13922314466 3008 3720 6728
13560439658 5892 400 6292
84138413 4116 1432 5548
15013685858 27 3659 3686
15920133257 20 3156 3176
13602846565 12 1938 1950
15989002119 3 1938 1941
13926435656 1512 200 1712
18211575961 12 1527 1539
13560436666 954 200 1154
13480253104 180 200 380
13760778710 120 200 320
13826544101 0 200 200
13926251106 0 200 200
13719199419 0 200 200
[root@master hadoop]#