【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

简介: 【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

文章目录


1 序列化概述

2 自定义bean对象实现序列化接口(Writable)

3 序列化案例实操


1 序列化概述


20191211104446597.png


20191211104514875.png



2 自定义bean对象实现序列化接口(Writable)


在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。


具体实现bean对象序列化步骤如下7步。


(1)必须实现Writable接口


(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
  super();
}


(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
}


(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
  upFlow = in.readLong();
  downFlow = in.readLong();
  sumFlow = in.readLong();
}


(5)注意反序列化的顺序和序列化的顺序完全一致


(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。


(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {
  // 倒序排列,从大到小
  return this.sumFlow > o.getSumFlow() ? -1 : 1;
}


3 序列化案例实操


1.需求

统计每一个手机号耗费的总上行流量、下行流量、总流量


(1)输入数据格式:

7  13560436666  120.196.100.99  1116   954    200
id  手机号码  网络ip    上行流量  下行流量     网络状态码


(2)期望输出数据格式

13560436666   1116        954    2070
手机号码      上行流量        下行流量  总流量


2.需求分析


20191211104555528.png


3.编写MapReduce程序


(1)编写流量统计的Bean对象

package com.kgc.phone;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:序列化案例实操
 */
//1.  需求 : 统计每一个手机号耗费的总上行流量、下行流量、总流量
    //输入数据格式:
/*7  13560436666  120.196.100.99  1116     954  200
id     手机号码    网络ip     上行流量  下行流量     网络状态码*/
    //期望输出数据格式
/*13560436666   1116        954    2070
        手机号码      上行流量        下行流量  总流量*/
    // 1 实现writable接口
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}


(2)编写Mapper类

package com.kgc.phone;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
        k.set(phoneNum);
        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // 4 写出
        context.write(k,v);
    }
}


(3)编写Reducer类

package com.kgc.phone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sun_downFlow = 0;
        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow+=flowBean.getUpFlow();
            sun_downFlow+=flowBean.getDownFlow();
        }
        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow,sun_downFlow);
        // 3 写出
        context.write(key,resultBean);
    }
}


(4)编写Driver驱动类

package com.kgc.phone;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowsumDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);
        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 5 指定job的输入原始文件所在目录
        FileInputFormat.addInputPath(job,new Path("hdfs://192.168.56.137:9000/data2/phone.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.56.137:9000/my6"));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        job.waitForCompletion(true);
    }
}


目录
相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
66 2
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
98 3
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
87 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
40 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
48 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)
|
存储 分布式计算 Hadoop
【Hadoop】一个例子带你了解MapReduce
【Hadoop】一个例子带你了解MapReduce
98 1
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)

相关实验场景

更多
下一篇
无影云桌面