【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

简介: 【Hadoop】(四)Hadoop 序列化 及 MapReduce 序列化案例实操

文章目录


1 序列化概述

2 自定义bean对象实现序列化接口(Writable)

3 序列化案例实操


1 序列化概述


20191211104446597.png


20191211104514875.png



2 自定义bean对象实现序列化接口(Writable)


在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。


具体实现bean对象序列化步骤如下7步。


(1)必须实现Writable接口


(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
  super();
}


(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
}


(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
  upFlow = in.readLong();
  downFlow = in.readLong();
  sumFlow = in.readLong();
}


(5)注意反序列化的顺序和序列化的顺序完全一致


(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。


(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {
  // 倒序排列,从大到小
  return this.sumFlow > o.getSumFlow() ? -1 : 1;
}


3 序列化案例实操


1.需求

统计每一个手机号耗费的总上行流量、下行流量、总流量


(1)输入数据格式:

7  13560436666  120.196.100.99  1116   954    200
id  手机号码  网络ip    上行流量  下行流量     网络状态码


(2)期望输出数据格式

13560436666   1116        954    2070
手机号码      上行流量        下行流量  总流量


2.需求分析


20191211104555528.png


3.编写MapReduce程序


(1)编写流量统计的Bean对象

package com.kgc.phone;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:序列化案例实操
 */
//1.  需求 : 统计每一个手机号耗费的总上行流量、下行流量、总流量
    //输入数据格式:
/*7  13560436666  120.196.100.99  1116     954  200
id     手机号码    网络ip     上行流量  下行流量     网络状态码*/
    //期望输出数据格式
/*13560436666   1116        954    2070
        手机号码      上行流量        下行流量  总流量*/
    // 1 实现writable接口
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}


(2)编写Mapper类

package com.kgc.phone;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割字段
        String[] fields = line.split("\t");
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
        k.set(phoneNum);
        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // 4 写出
        context.write(k,v);
    }
}


(3)编写Reducer类

package com.kgc.phone;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sun_downFlow = 0;
        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow+=flowBean.getUpFlow();
            sun_downFlow+=flowBean.getDownFlow();
        }
        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow,sun_downFlow);
        // 3 写出
        context.write(key,resultBean);
    }
}


(4)编写Driver驱动类

package com.kgc.phone;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author:Tokgo J
 * @date:2019/12/11
 * @aim:
 */
public class FlowsumDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);
        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 5 指定job的输入原始文件所在目录
        FileInputFormat.addInputPath(job,new Path("hdfs://192.168.56.137:9000/data2/phone.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.56.137:9000/my6"));
        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        job.waitForCompletion(true);
    }
}


目录
相关文章
|
6月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
186 3
|
10月前
|
存储 分布式计算 算法
|
10月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
102 1
|
10月前
|
数据采集 SQL 分布式计算
|
11月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
127 1
|
11月前
|
分布式计算 并行计算 搜索推荐
Hadoop MapReduce计算框架
【5月更文挑战第10天】HadoopMapReduce计算框架
94 3
|
10月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
83 0
|
10月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
401 0
|
10月前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
86 0
|
10月前
|
分布式计算 Hadoop Java
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
145 0

相关实验场景

更多
下一篇
oss创建bucket