一:序列化概念
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
Java序列化(java.io.Serializable)
二:Hadoop序列化的特点
(1):序列化格式特点:
紧凑:高效使用存储空间。
快速:读写数据的额外开销小。
可扩展:可透明地读取老格式的数据。
互操作:支持多语言的交互。
(2):Hadoop的序列化格式:Writable接口
三:Hadoop序列化的作用:
(1):序列化在分布式环境的两大作用:进程间通信,永久存储。
(2):Hadoop节点间通信。

四:Writable接口(实现序列化的类实现这个接口)
(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.
(2)MapReduce的任意Key和Value必须实现Writable接口.
(3)MapReduce的任意key必须实现WritableComparable接口.
1:创建一个FlowBean的实体类,实现序列化操作:
1 package com.flowSum;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.io.Writable;
8
9 /***
10 *
11 * @author Administrator
12 * 1:write 是把每个对象序列化到输出流
13 * 2:readFields是把输入流字节反序列化
14 * 3:实现WritableComparable
15 * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
16 *
17 */
18 public class FlowBean implements Writable{
19
20 private String phoneNumber;//电话号码
21 private long upFlow;//上行流量
22 private long downFlow;//下行流量
23 private long sumFlow;//总流量
24
25
26
27 public String getPhoneNumber() {
28 return phoneNumber;
29 }
30 public void setPhoneNumber(String phoneNumber) {
31 this.phoneNumber = phoneNumber;
32 }
33 public long getUpFlow() {
34 return upFlow;
35 }
36 public void setUpFlow(long upFlow) {
37 this.upFlow = upFlow;
38 }
39 public long getDownFlow() {
40 return downFlow;
41 }
42 public void setDownFlow(long downFlow) {
43 this.downFlow = downFlow;
44 }
45 public long getSumFlow() {
46 return sumFlow;
47 }
48 public void setSumFlow(long sumFlow) {
49 this.sumFlow = sumFlow;
50 }
51
52 //为了对象数据的初始化方便,加入一个带参的构造函数
53 public FlowBean(String phoneNumber, long upFlow, long downFlow) {
54 this.phoneNumber = phoneNumber;
55 this.upFlow = upFlow;
56 this.downFlow = downFlow;
57 this.sumFlow = upFlow + downFlow;
58 }
59 //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
60 public FlowBean() {
61 }
62
63 //重写toString()方法
64 @Override
65 public String toString() {
66 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
67 }
68
69
70 //从数据流中反序列出对象的数据
71 //从数据流中读取字段时必须和序列化的顺序保持一致
72 @Override
73 public void readFields(DataInput in) throws IOException {
74 phoneNumber = in.readUTF();
75 upFlow = in.readLong();
76 downFlow = in.readLong();
77 sumFlow = in.readLong();
78
79 }
80
81 //将对象数据序列化到流中
82 @Override
83 public void write(DataOutput out) throws IOException {
84 out.writeUTF(phoneNumber);
85 out.writeLong(upFlow);
86 out.writeLong(downFlow);
87 out.writeLong(sumFlow);
88
89 }
90
91
92 }
创建FlowSumMapper的类实现Mapper这个类:
1 package com.flowSum;
2
3 import java.io.IOException;
4
5 import org.apache.commons.lang.StringUtils;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Mapper;
9 /***
10 *
11 * @author Administrator
12 * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13 * 所以就必须实现hadoop的相应的序列化接口
14 * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15 */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17
18 //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19 //封装成key-value发送出去
20
21 @Override
22 protected void map(LongWritable key, Text value, Context context)
23 throws IOException, InterruptedException {
24 //拿到一行数据
25 String line = value.toString();
26 //切分成各个字段
27 String[] fields = StringUtils.split(line,"/t");
28 //拿到手机号的字段
29 String phoneNumber = fields[1];
30 //拿到上行流量字段
31 long up_flow = Long.parseLong(fields[7]);
32 //拿到下行流量字段
33 long down_flow = Long.parseLong(fields[8]);
34
35 //最后一步,封装数据为key-value进行输出
36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37
38 }
39
40 }
创建FlowSumReducer类继承Reducer类:
1 package com.flowSum;
2
3 import java.io.IOException;
4
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.mapreduce.Reducer;
7
8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
9
10 //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法
11 //reduce中的业务逻辑就是遍历values,然后累加求和再输出
12 @Override
13 protected void reduce(Text key, Iterable<FlowBean> values, Context context)
14 throws IOException, InterruptedException {
15 //上行流量计数器和下行流量计数器
16 long up_flow_counter = 0;
17 long down_flow_counter = 0;
18
19 //上行流量和下行流量累加求和
20 for(FlowBean bean : values){
21 up_flow_counter += bean.getUpFlow();
22 down_flow_counter += bean.getDownFlow();
23 }
24
25 //将结果输出
26 context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter));
27
28 }
29
30 }
创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):
1 package com.flowSum;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.conf.Configured;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.mapreduce.Job;
7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
9 import org.apache.hadoop.util.Tool;
10 import org.apache.hadoop.util.ToolRunner;
11
12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
13 /***
14 *
15 * @author Administrator
16 * 1:Job描述和提交类的规范写法
17 */
18 public class FlowSumRunner extends Configured implements Tool{
19
20
21 @Override
22 public int run(String[] args) throws Exception {
23 //创建配置文件
24 Configuration conf = new Configuration();
25 //获取一个作业
26 Job job = Job.getInstance(conf);
27
28 //设置整个job所用的那些类在哪个jar包
29 job.setJarByClass(FlowSumRunner.class);
30
31 //本job使用的mapper和reducer的类
32 job.setMapperClass(FlowSumMapper.class);
33 job.setReducerClass(FlowSumReducer.class);
34
35 //指定mapper的输出数据key-value类型
36 job.setMapOutputKeyClass(Text.class);
37 job.setMapOutputValueClass(FlowBean.class);
38
39 //指定reduce的输出数据key-value类型
40 job.setOutputKeyClass(Text.class);
41 job.setOutputValueClass(FlowBean.class);
42
43 //指定要处理的输入数据存放路径
44 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
45 //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
46 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
47 FileInputFormat.setInputPaths(job, new Path(args[0]));
48
49 //指定处理结果的输出数据存放路径
50 FileOutputFormat.setOutputPath(job, new Path(args[1]));
51
52 //将job提交给集群运行
53 //job.waitForCompletion(true);
54 //正常执行成功返回0,否则返回1
55 return job.waitForCompletion(true) ? 0 : 1;
56 }
57
58 public static void main(String[] args) throws Exception {
59 //规范性调用
60 int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
61 //执行结束退出
62 System.exit(res);
63 }
64
65 }
然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
可以看到打的包和模拟数据已经上传到虚拟机上:

然后将数据上传到hdfs集群(这里是伪分布式集群)上面:
现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:

然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:

然后就报了一个这样子的错,我也是一脸懵逼:
Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
at java.lang.Class.asSubclass(Class.java:3165)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:
原来是Text的包导错了(还是小心点好。不然够喝一壶的了)
不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
而是:import org.apache.hadoop.io.Text;
然后打包上传到虚拟机上面运行,然后你会发现这个错误:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:

最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):
然后就报数据越界的异常,我想可能是测试数据不干净:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162
然后手动造了一份数据,如下所示:
(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)
1363157985066 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 241 200
1363157985061 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200
1363157985062 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 681 200
1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 4681 200
1363157985064 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 5481 4681 200
1363157985065 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 6481 2681 200
1363157985066 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 7481 2481 200
1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 8481 2461 200
1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 281 200
1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 2681 200
1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 24681 200
1363157985069 13726230509 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 681 200
1363157985060 13726230500 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 24681 200
1363157985061 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200
1363157985066 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 81 200
1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200
1363157985063 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200
1363157985064 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 2681 200
1363157985065 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200
1363157985066 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 81 24681 200
1363157985067 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 241 200
1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 681 200
1363157985068 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 241 681 200
最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");
(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)
1 package com.flowSum;
2
3 import java.io.IOException;
4
5 import org.apache.commons.lang.StringUtils;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Mapper;
9 /***
10 *
11 * @author Administrator
12 * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13 * 所以就必须实现hadoop的相应的序列化接口
14 * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15 */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17
18 //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19 //封装成key-value发送出去
20
21 @Override
22 protected void map(LongWritable key, Text value, Context context)
23 throws IOException, InterruptedException {
24 //拿到一行数据
25 String line = value.toString();
26 //切分成各个字段
27 String[] fields = StringUtils.split(line, " ");
28 //拿到手机号的字段
29 String phoneNumber = fields[1];
30 //拿到上行流量字段
31 long up_flow = Long.parseLong(fields[7]);
32 //拿到下行流量字段
33 long down_flow = Long.parseLong(fields[8]);
34
35 //最后一步,封装数据为key-value进行输出
36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37
38 }
39
40 }
打包上传到虚拟机上面,然后运行(正常运行结果如下所示):
[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job: map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job: map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1179
FILE: Number of bytes written=187971
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2467
HDFS: Number of bytes written=279
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2691
Total time spent by all reduces in occupied slots (ms)=2582
Total time spent by all map tasks (ms)=2691
Total time spent by all reduce tasks (ms)=2582
Total vcore-seconds taken by all map tasks=2691
Total vcore-seconds taken by all reduce tasks=2582
Total megabyte-seconds taken by all map tasks=2755584
Total megabyte-seconds taken by all reduce tasks=2643968
Map-Reduce Framework
Map input records=23
Map output records=23
Map output bytes=1127
Map output materialized bytes=1179
Input split bytes=93
Combine input records=0
Combine output records=0
Reduce input groups=10
Reduce shuffle bytes=1179
Reduce input records=23
Reduce output records=10
Spilled Records=46
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=126
CPU time spent (ms)=1240
Physical memory (bytes) snapshot=218099712
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2374
File Output Format Counters
Bytes Written=279
[root@master hadoop]#
查看输出结果如下所示:

总之吧,学习新知识,难免各种错误,静下心去解决吧。
2:流量求和统计排序案例实践:
将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);
1 package com.flowSort;
2
3 import java.io.IOException;
4
5 import org.apache.commons.lang.StringUtils;
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.LongWritable;
9 import org.apache.hadoop.io.NullWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16
17
18 public class FlowSortMapReduce {
19
20 /***
21 * mapper静态内部类
22 * @author Administrator
23 *
24 */
25 public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
26
27 //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
28 @Override
29 protected void map(LongWritable key, Text value,Context context)
30 throws IOException, InterruptedException {
31 //获取到一行数据
32 String line = value.toString();
33 //对这一行数据进行截取
34 String[] fields = StringUtils.split(line, "");
35
36 //获取数据里面的数据
37 String phoneNumber = fields[0];
38 long up_flow = Long.parseLong(fields[1]);
39 long down_flow = Long.parseLong(fields[2]);
40
41 //将数据进行封装传递给reduce
42 context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get());
43 }
44
45 }
46
47 /***
48 * reducer的静态内部类
49 * @author Administrator
50 *
51 */
52 public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
53
54 @Override
55 protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
56 throws IOException, InterruptedException {
57
58 String phoneNumber = key.getPhoneNumber();
59 context.write(new Text(phoneNumber), key);
60 }
61 }
62
63
64 /***
65 * 主方法
66 * @param args
67 * @throws InterruptedException
68 * @throws IOException
69 * @throws ClassNotFoundException
70 */
71 public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
72 //创建配置文件
73 Configuration conf = new Configuration();
74 //获取一个作业
75 Job job = Job.getInstance(conf);
76
77 //设置整个job所用的那些类在哪个jar包
78 job.setJarByClass(FlowSortMapReduce.class);
79
80 //本job使用的mapper和reducer的类
81 job.setMapperClass(FlowSortMapper.class);
82 job.setReducerClass(FlowSortReducer.class);
83
84 //指定mapper的输出数据key-value类型
85 job.setMapOutputKeyClass(FlowBean.class);
86 job.setMapOutputValueClass(NullWritable.class);
87
88 //指定reduce的输出数据key-value类型Text
89 job.setOutputKeyClass(Text.class);
90 job.setOutputValueClass(FlowBean.class);
91
92 //指定要处理的输入数据存放路径
93 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
94 //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
95 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
96 FileInputFormat.setInputPaths(job, new Path(args[0]));
97
98 //指定处理结果的输出数据存放路径
99 FileOutputFormat.setOutputPath(job, new Path(args[1]));
100
101 //将job提交给集群运行
102 //job.waitForCompletion(true);
103 //正常执行成功返回0,否则返回1
104 System.exit(job.waitForCompletion(true) ? 0 : 1);
105 }
106
107 }
实体类改造,进行总流量排序处理:
1 package com.flowSort;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.io.Writable;
8 import org.apache.hadoop.io.WritableComparable;
9
10 /***
11 *
12 * @author Administrator
13 * 1:write 是把每个对象序列化到输出流
14 * 2:readFields是把输入流字节反序列化
15 * 3:实现WritableComparable
16 * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
17 *
18 */
19 public class FlowBean implements WritableComparable<FlowBean>{
20
21
22 private String phoneNumber;//电话号码
23 private long upFlow;//上行流量
24 private long downFlow;//下行流量
25 private long sumFlow;//总流量
26
27
28
29 public String getPhoneNumber() {
30 return phoneNumber;
31 }
32 public void setPhoneNumber(String phoneNumber) {
33 this.phoneNumber = phoneNumber;
34 }
35 public long getUpFlow() {
36 return upFlow;
37 }
38 public void setUpFlow(long upFlow) {
39 this.upFlow = upFlow;
40 }
41 public long getDownFlow() {
42 return downFlow;
43 }
44 public void setDownFlow(long downFlow) {
45 this.downFlow = downFlow;
46 }
47 public long getSumFlow() {
48 return sumFlow;
49 }
50 public void setSumFlow(long sumFlow) {
51 this.sumFlow = sumFlow;
52 }
53
54 //为了对象数据的初始化方便,加入一个带参的构造函数
55 public FlowBean(String phoneNumber, long upFlow, long downFlow) {
56 this.phoneNumber = phoneNumber;
57 this.upFlow = upFlow;
58 this.downFlow = downFlow;
59 this.sumFlow = upFlow + downFlow;
60 }
61 //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
62 public FlowBean() {
63 }
64
65 //重写toString()方法
66 @Override
67 public String toString() {
68 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
69 }
70
71
72 //从数据流中反序列出对象的数据
73 //从数据流中读取字段时必须和序列化的顺序保持一致
74 @Override
75 public void readFields(DataInput in) throws IOException {
76 phoneNumber = in.readUTF();
77 upFlow = in.readLong();
78 downFlow = in.readLong();
79 sumFlow = in.readLong();
80
81 }
82
83 //将对象数据序列化到流中
84 @Override
85 public void write(DataOutput out) throws IOException {
86 out.writeUTF(phoneNumber);
87 out.writeLong(upFlow);
88 out.writeLong(downFlow);
89 out.writeLong(sumFlow);
90
91 }
92
93 //流量比较的实现方法
94 @Override
95 public int compareTo(FlowBean o) {
96
97 //大就返回-1,小于等于返回1,进行倒序排序
98 return sumFlow > o.sumFlow ? -1 : 1;
99 }
100
101
102
103 }
效果就是这样,总之问题不断:
[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job: map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job: map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job: map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=822
FILE: Number of bytes written=187379
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=635
HDFS: Number of bytes written=526
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2031
Total time spent by all reduces in occupied slots (ms)=2599
Total time spent by all map tasks (ms)=2031
Total time spent by all reduce tasks (ms)=2599
Total vcore-seconds taken by all map tasks=2031
Total vcore-seconds taken by all reduce tasks=2599
Total megabyte-seconds taken by all map tasks=2079744
Total megabyte-seconds taken by all reduce tasks=2661376
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=109
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=121
CPU time spent (ms)=700
Physical memory (bytes) snapshot=218284032
Virtual memory (bytes) snapshot=726839296
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=526
File Output Format Counters
Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r-- 1 root supergroup 0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r-- 1 root supergroup 526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888 2481 24681 27162
13726230503 2481 24681 27162
13925057413 63 11058 11121
18320173382 18 9531 9549
13502468823 102 7335 7437
13660577991 9 6960 6969
13922314466 3008 3720 6728
13560439658 5892 400 6292
84138413 4116 1432 5548
15013685858 27 3659 3686
15920133257 20 3156 3176
13602846565 12 1938 1950
15989002119 3 1938 1941
13926435656 1512 200 1712
18211575961 12 1527 1539
13560436666 954 200 1154
13480253104 180 200 380
13760778710 120 200 320
13826544101 0 200 200
13926251106 0 200 200
13719199419 0 200 200
[root@master hadoop]#