三十四、MapReduce之FlowData案例(序列化案例实施)

简介: 三十四、MapReduce之FlowData案例(序列化案例实施)

前言:


序列化概述:


       (1)什么是序列化?


      序列化就是把内存中的对象,转换成字节序列(或其他数据协议)以便于存储到磁盘(持久化)和网络传输。


       (2)什么是反序列化?


      反序列化就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。


       (3)为什么要序列化?


       一般情况,【活的】对象只生存在内存里,关机断电就没有了,而且【活的】对象只能由本地的进程使用,不能被发送到网络上的另一台计算机。然而序列化可以存储【活的】对象,可以将【活的】对象发送到远程计算机。


       (4)为什么不用Java的序列化?


       Java的序列化是一个重量级序列化框架,一个对象被序列化后,会附带很多额外信息,不便于在网络中高效传输。所以Hadoop开发了一套自己的序列化机制。


       (5)Hadoop序列化特点?


紧凑:高效使用存储空间

快速:读写数据的额外开销小

可扩展性:随着通信协议的升级而升级

互操作:支持多语言的交互


案例分析:


       FlowData案例主要实现数据的提取与计算,使用MapReduce来实现对数据的准确提取与相关计算。每一个手机号有其相对应的标号,手机号,网址IP,网址域名,上行流量,下行流量,状态码,其字段个数不一定相同,标号对应的手机号码也不唯一。本案例预期实现提取手机号,上行流量和下行流量及其计算出总流量,其余字段不予提取。


思路扩展:


       手机号码作为K,上行和下行流量及其总流量封装成Bean对象作为Value输出,Bean对象为自定义对象则必须实现序列化接口,累加上行流量和下行流量得到总流量。


输入数据:

72.png


期望输出数据:

73.png


程序编写:


(1)编写FlowBean类:


package org.example.mapreduce.writable02;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
    定义实现的writable方法
    重写序列化和反序列化方法
    重写空参构造
    重写tostring方法
 */
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    public FlowBean() {
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

(2)编写FlowMap类:


package org.example.mapreduce.writable02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split("\t");
        String phone = split[1];
        String up = split[split.length - 3];
        String down = split[split.length - 2];
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();
        context.write(outK, outV);
    }
}

(3)编写FlowReducer类:


package org.example.mapreduce.writable02;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        long totalUp = 0;
        long totalDown = 0;
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();
        context.write(key, outV);
    }
}

(4)编写FlowDriver类:


package org.example.mapreduce.writable02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置jar包
        job.setJarByClass(FlowDriver.class);
        //关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //设置Mapper的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //设置最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //设置输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path("E:\\input\\inputflow"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\output\\outputFlow"));
        //提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result? 0:1);
    }
}

数据分析:


       输入数据和输出数据对比分析:


74.png


75.png


        注意:输入数据中每一行的字段数不一定相同,且有空余字段,电话号码有相同号码却分为多行,此时需进行相同号码合并数据然后通过计算输出总流量,最后以电话号码数字大小作为排序输出。


     程序步骤不做过多赘述,详情请参考:


MapReduce之wordcount案例(环境搭建及案例实施)

https://blog.csdn.net/m0_54925305/article/details/120155693?spm=1001.2014.3001.5502


发布MapReduce程序在集群上运行之wordcount案例实施

https://blog.csdn.net/m0_54925305/article/details/120315188?spm=1001.2014.3001.5502


MapReduce之FlowData案例(序列化案例实施)完成


相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
101 3
|
7月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
326 2
|
6月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
52 0
|
7月前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
7月前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
311 0
|
7月前
|
存储 C#
C#中的序列化和反序列化案例
C#中的序列化和反序列化案例
|
分布式计算 Java Hadoop
26 MAPREDUCE中的序列化
26 MAPREDUCE中的序列化
58 1
|
7月前
|
存储 分布式计算 Hadoop
MapReduce序列化【用户流量使用统计】
MapReduce序列化【用户流量使用统计】
|
存储 缓存 分布式计算
Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(二)
Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(二)
|
存储 缓存 分布式计算
Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(一)
Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(一)