【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

简介: 【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

文章目录


1 序列化概述

2 自定义bean对象实现序列化接口(Writable)

3 序列化案例实操


1 序列化概述


20191211104446597.png


20191211104514875.png



2 自定义bean对象实现序列化接口(Writable)


在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。


具体实现bean对象序列化步骤如下7步。


(1)必须实现Writable接口


(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
  super();
}


(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
}


(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
  upFlow = in.readLong();
  downFlow = in.readLong();
  sumFlow = in.readLong();
}


(5)注意反序列化的顺序和序列化的顺序完全一致


(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。


(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {
  // 倒序排列,从大到小
  return this.sumFlow > o.getSumFlow() ? -1 : 1;
}


3 序列化案例实操


1.需求

统计每一个手机号耗费的总上行流量、下行流量、总流量


(1)输入数据格式:

7  13560436666  120.196.100.99  1116   954    200
id  手机号码  网络ip    上行流量  下行流量     网络状态码


(2)期望输出数据格式

13560436666   1116        954    2070
手机号码      上行流量        下行流量  总流量


2.需求分析


20191211104555528.png


3.编写MapReduce程序


(1)编写流量统计的Bean对象

package com.kgc.phone;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:序列化案例实操
 */
//1.  需求 : 统计每一个手机号耗费的总上行流量、下行流量、总流量
    //输入数据格式:
/*7  13560436666  120.196.100.99  1116     954  200
id     手机号码    网络ip     上行流量  下行流量     网络状态码*/
    //期望输出数据格式
/*13560436666   1116        954    2070
        手机号码      上行流量        下行流量  总流量*/
    // 1 实现writable接口
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}


(2)编写Mapper类

package com.kgc.phone;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
        k.set(phoneNum);
        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // 4 写出
        context.write(k,v);
    }
}


(3)编写Reducer类

package com.kgc.phone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sun_downFlow = 0;
        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow+=flowBean.getUpFlow();
            sun_downFlow+=flowBean.getDownFlow();
        }
        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow,sun_downFlow);
        // 3 写出
        context.write(key,resultBean);
    }
}


(4)编写Driver驱动类

package com.kgc.phone;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowsumDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);
        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 5 指定job的输入原始文件所在目录
        FileInputFormat.addInputPath(job,new Path("hdfs://192.168.56.137:9000/data2/phone.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.56.137:9000/my6"));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        job.waitForCompletion(true);
    }
}


目录
相关文章
|
3天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
140 2
|
2天前
|
分布式计算 并行计算 搜索推荐
Hadoop MapReduce计算框架
【5月更文挑战第10天】HadoopMapReduce计算框架
13 3
|
3天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
37 0
|
3天前
|
分布式计算 资源调度 Hadoop
java与大数据:Hadoop与MapReduce
java与大数据:Hadoop与MapReduce
27 0
|
3天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
59 0
|
3天前
|
存储 XML JSON
数据传输的艺术:深入探讨序列化与反序列化
数据传输的艺术:深入探讨序列化与反序列化
74 0
|
3天前
|
存储 C#
C#中的序列化和反序列化
C#中的序列化和反序列化
23 0
|
3天前
|
存储 安全 Java
Java一分钟之-Java序列化与反序列化
【5月更文挑战第14天】Java序列化用于将对象转换为字节流,便于存储和网络传输。实现`Serializable`接口使类可被序列化,但可能引发隐私泄露、版本兼容性和性能问题。要避免这些问题,可使用`transient`关键字、控制`serialVersionUID`及考虑使用安全的序列化库。示例代码展示了如何序列化和反序列化对象,强调了循环引用和未实现`Serializable`的错误。理解并妥善处理这些要点对优化代码至关重要。
14 1
|
3天前
|
JSON 安全 Java
Spring Boot 序列化、反序列化
本文介绍了Spring Boot中的序列化和反序列化。Java提供默认序列化机制,通过实现Serializable接口实现对象到字节流的转换。Spring Boot默认使用Jackson处理JSON,可通过注解和配置自定义规则。然而,序列化可能引发安全问题,建议使用白名单、数据校验和安全库。最佳实践包括使用标准机制、自定义规则及注意版本控制。文章还提醒关注性能并提供了相关参考资料。
67 2
|
3天前
|
XML 存储 JSON
c#XML、JSON的序列化和反序列化,看完你就懂了
c#XML、JSON的序列化和反序列化,看完你就懂了
28 0