【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

简介: 【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

文章目录


1 序列化概述

2 自定义bean对象实现序列化接口(Writable)

3 序列化案例实操


1 序列化概述


20191211104446597.png


20191211104514875.png



2 自定义bean对象实现序列化接口(Writable)


在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。


具体实现bean对象序列化步骤如下7步。


(1)必须实现Writable接口


(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
  super();
}


(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
}


(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
  upFlow = in.readLong();
  downFlow = in.readLong();
  sumFlow = in.readLong();
}


(5)注意反序列化的顺序和序列化的顺序完全一致


(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。


(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {
  // 倒序排列,从大到小
  return this.sumFlow > o.getSumFlow() ? -1 : 1;
}


3 序列化案例实操


1.需求

统计每一个手机号耗费的总上行流量、下行流量、总流量


(1)输入数据格式:

7  13560436666  120.196.100.99  1116   954    200
id  手机号码  网络ip    上行流量  下行流量     网络状态码


(2)期望输出数据格式

13560436666   1116        954    2070
手机号码      上行流量        下行流量  总流量


2.需求分析


20191211104555528.png


3.编写MapReduce程序


(1)编写流量统计的Bean对象

package com.kgc.phone;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:序列化案例实操
 */
//1.  需求 : 统计每一个手机号耗费的总上行流量、下行流量、总流量
    //输入数据格式:
/*7  13560436666  120.196.100.99  1116     954  200
id     手机号码    网络ip     上行流量  下行流量     网络状态码*/
    //期望输出数据格式
/*13560436666   1116        954    2070
        手机号码      上行流量        下行流量  总流量*/
    // 1 实现writable接口
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}


(2)编写Mapper类

package com.kgc.phone;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
        k.set(phoneNum);
        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // 4 写出
        context.write(k,v);
    }
}


(3)编写Reducer类

package com.kgc.phone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sun_downFlow = 0;
        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow+=flowBean.getUpFlow();
            sun_downFlow+=flowBean.getDownFlow();
        }
        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow,sun_downFlow);
        // 3 写出
        context.write(key,resultBean);
    }
}


(4)编写Driver驱动类

package com.kgc.phone;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowsumDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);
        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 5 指定job的输入原始文件所在目录
        FileInputFormat.addInputPath(job,new Path("hdfs://192.168.56.137:9000/data2/phone.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.56.137:9000/my6"));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        job.waitForCompletion(true);
    }
}


目录
相关文章
|
5月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
40 1
|
20天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
70 2
|
20天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
56 0
|
4月前
|
存储 分布式计算 监控
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
56 0
|
5月前
|
存储 Linux
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
[hadoop3.x]HDFS之银行海量转账数据分层案例(八)
110 1
|
5月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
275 0
|
20天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
46 2
|
20天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
8天前
|
分布式计算 Hadoop 大数据
[大数据] mac 史上最简单 hadoop 安装过程
[大数据] mac 史上最简单 hadoop 安装过程
|
22天前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
66 1

热门文章

最新文章

相关实验场景

更多