前言:
序列化概述:
(1)什么是序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据协议)以便于存储到磁盘(持久化)和网络传输。
(2)什么是反序列化?
反序列化就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
(3)为什么要序列化?
一般情况,【活的】对象只生存在内存里,关机断电就没有了,而且【活的】对象只能由本地的进程使用,不能被发送到网络上的另一台计算机。然而序列化可以存储【活的】对象,可以将【活的】对象发送到远程计算机。
(4)为什么不用Java的序列化?
Java的序列化是一个重量级序列化框架,一个对象被序列化后,会附带很多额外信息,不便于在网络中高效传输。所以Hadoop开发了一套自己的序列化机制。
(5)Hadoop序列化特点?
紧凑:高效使用存储空间
快速:读写数据的额外开销小
可扩展性:随着通信协议的升级而升级
互操作:支持多语言的交互
案例分析:
FlowData案例主要实现数据的提取与计算,使用MapReduce来实现对数据的准确提取与相关计算。每一个手机号有其相对应的标号,手机号,网址IP,网址域名,上行流量,下行流量,状态码,其字段个数不一定相同,标号对应的手机号码也不唯一。本案例预期实现提取手机号,上行流量和下行流量及其计算出总流量,其余字段不予提取。
思路扩展:
手机号码作为K,上行和下行流量及其总流量封装成Bean对象作为Value输出,Bean对象为自定义对象则必须实现序列化接口,累加上行流量和下行流量得到总流量。
输入数据:
期望输出数据:
程序编写:
(1)编写FlowBean类:
package org.example.mapreduce.writable02; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* 定义实现的writable方法 重写序列化和反序列化方法 重写空参构造 重写tostring方法 */ public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
(2)编写FlowMap类:
package org.example.mapreduce.writable02; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2]; outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); context.write(outK, outV); } }
(3)编写FlowReducer类:
package org.example.mapreduce.writable02; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long totalUp = 0; long totalDown = 0; for (FlowBean value : values) { totalUp += value.getUpFlow(); totalDown += value.getDownFlow(); } outV.setUpFlow(totalUp); outV.setDownFlow(totalDown); outV.setSumFlow(); context.write(key, outV); } }
(4)编写FlowDriver类:
package org.example.mapreduce.writable02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置jar包 job.setJarByClass(FlowDriver.class); //关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置Mapper的KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //设置最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置输入路径和输出路径 FileInputFormat.setInputPaths(job,new Path("E:\\input\\inputflow")); FileOutputFormat.setOutputPath(job,new Path("E:\\output\\outputFlow")); //提交job boolean result = job.waitForCompletion(true); System.exit(result? 0:1); } }
数据分析:
输入数据和输出数据对比分析:
注意:输入数据中每一行的字段数不一定相同,且有空余字段,电话号码有相同号码却分为多行,此时需进行相同号码合并数据然后通过计算输出总流量,最后以电话号码数字大小作为排序输出。
程序步骤不做过多赘述,详情请参考:
MapReduce之wordcount案例(环境搭建及案例实施)
https://blog.csdn.net/m0_54925305/article/details/120155693?spm=1001.2014.3001.5502
发布MapReduce程序在集群上运行之wordcount案例实施
https://blog.csdn.net/m0_54925305/article/details/120315188?spm=1001.2014.3001.5502
MapReduce之FlowData案例(序列化案例实施)完成