MapReduce入门(一篇就够了)(上)

简介: MapReduce入门(一篇就够了)(上)

01 引言

MapReduceHadoop生态圈的一部分,也是最核心的一部分,本文来讲解下。

02 MapReduce 概述

2.1 MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

2.2 MapReduce 产生缘由

为什么需要MapReduce?

  • 海量数据在单机上处理因为硬件资源限制,无法胜任。
  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
  • 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

设想一个海量数据场景下的wordcount需求:

  • 单机版:内存受限,磁盘受限,运算能力受限
  • 分布式:文件分布式存储(HDFS)、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)、运算程序如何分发、程序如何分配运算任务(切片)、两阶段的程序如何启动?如何协调?、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。

2.3 MapReduce与Yarn的关系

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台。而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

YARN的重要概念

  1. yarn并不清楚用户提交的程序的运行机制
  2. yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)
  3. yarn中的主管角色叫ResourceManager
  4. yarn中具体提供运算资源的角色叫NodeManager
  5. 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(MapReduce只是其中的一种),比如MapReducestorm程序,spark程序,tez……
  6. 所以,sparkstorm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可
  7. Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享

2.4 MapReduce 中的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系。。。。),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制Writable,精简,高效。

简单代码验证两种序列化机制的差别:

public class TestSeri {
  public static void main(String[] args) throws Exception {
    //定义两个ByteArrayOutputStream,用来接收不同序列化机制的序列化结果
    ByteArrayOutputStream ba = new ByteArrayOutputStream();
    ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
    //定义两个DataOutputStream,用于将普通对象进行jdk标准序列化
    DataOutputStream dout = new DataOutputStream(ba);
    DataOutputStream dout2 = new DataOutputStream(ba2);
    ObjectOutputStream obout = new ObjectOutputStream(dout2);
    //定义两个bean,作为序列化的源对象
    ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f);
    ItemBean itemBean = new ItemBean(1000L, 89.9f);
    //用于比较String类型和Text类型的序列化差别
    Text atext = new Text("a");
    // atext.write(dout);
    itemBean.write(dout);
    byte[] byteArray = ba.toByteArray();
    //比较序列化结果
    System.out.println(byteArray.length);
    for (byte b : byteArray) {
      System.out.print(b);
      System.out.print(":");
    }
    System.out.println("-----------------------");
    String astr = "a";
    // dout2.writeUTF(astr);
    obout.writeObject(itemBeanSer);
    byte[] byteArray2 = ba2.toByteArray();
    System.out.println(byteArray2.length);
    for (byte b : byteArray2) {
      System.out.print(b);
      System.out.print(":");
    }
  }
}

如何自定义对象实现MR中的序列化接口?(里面有很多mr的概念,可以阅读完本文再看这里

  • 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是:
public class FlowBean implements WritableComparable<FlowBean>

需要自己实现的方法是:

/**
   * 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致
   */
  @Override
  public void readFields(DataInput in) throws IOException {
    upflow = in.readLong();
    dflow = in.readLong();
    sumflow = in.readLong();
  }
  /**
   * 序列化的方法
   */
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(upflow);
    out.writeLong(dflow);
    //可以考虑不序列化总流量,因为总流量是可以通过上行流量和下行流量计算出来的
    out.writeLong(sumflow);
  }
  @Override
  public int compareTo(FlowBean o) {
    //实现按照sumflow的大小倒序排序
    return sumflow>o.getSumflow()?-1:1;
  }

03 MapReduce 工作原理

3.1 MapReduce 进程

为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

MapReduce就是这样一个分布式程序的通用框架,整体结构如下(在分布式运行时有三类实例进程):

  • MRAppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责map阶段的整个数据处理流程
  • ReduceTask:负责reduce阶段的整个数据处理流程

3.2 MapReduce 运行机制

流程描述如下:

① 一个MR程序启动的时候,最先启动的是MRAppMasterMRAppMaster启动后根据本次job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器启动相应数量的MapTask进程;

MapTask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

  • 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对;
  • 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存;
  • 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件。

MRAppMaster监控到所有MapTask进程任务完成之后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区);

ReduceTask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同keyKV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。

3.2.1 MapTask 并行度决定机制

从上面的MapReduce运行流程可以知道,一个jobmap阶段的并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:

  • 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split
  • 然后每一个split分配一个mapTask并行实例处理;
  • 这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成。
3.2.1.1 FileInputFormat切片机制

FileInputFormat切片机制原理如下图:

3.2.1.2 FileInputFormat切片步骤

Step1: 切片定义在InputFormat类中的getSplit()方法;


Step2: FileInputFormat中默认的切片机制:

  • 简单地按照文件的内容长度进行切片
  • 切片大小,默认等于block大小
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:file1.txt(320M)、file2.txt (10M),经过FileInputFormat的切片机制运算后,形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

Step3: FileInputFormat中切片的大小的参数配置

通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));切片主要由这几个值来运算决定:

  • minsize(切片最小值,默认值:1 ):参数调的比blockSize大,则可以让切片变得比blocksize还大,配置参数: mapreduce.input.fileinputformat.split.minsize
  • maxsize(切片最大值,默认值:Long.MAXValue ):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值,配置参数:mapreduce.input.fileinputformat.split.maxsize
  • blocksize:切片大小

3.2.2 MapTask 并行度调优

选择并发数的影响因素:

  • 运算节点的硬件配置
  • 运算任务的类型:CPU密集型还是IO密集型
  • 运算任务的数据量

举例:

  • 如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。
  • 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该jobmap或者reduce数,每一个task(map|reduce)setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。

配置taskjvm重用可以改善该问题:

  • mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1,也就是说一个task启一个JVM);
  • 如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB
目录
相关文章
|
11月前
|
分布式计算 Hadoop 关系型数据库
Sqoop入门(一篇就够了)(下)
Sqoop入门(一篇就够了)(下)
109 0
|
11月前
|
SQL 分布式计算 调度
Spark入门(一篇就够了)(三)
Spark入门(一篇就够了)(三)
217 0
|
10月前
|
存储 NoSQL Linux
JuiceFS-开源分布式文件系统入门(一篇就够了)(下)
JuiceFS-开源分布式文件系统入门(一篇就够了)(下)
241 0
|
11月前
|
存储 缓存 分布式计算
Spark入门(一篇就够了)(一)
Spark入门(一篇就够了)(一)
366 0
|
11月前
|
存储 分布式计算 资源调度
Hadoop入门(一篇就够了)(上)
Hadoop入门(一篇就够了)(上)
286 0
|
10月前
|
存储 Kubernetes API
JuiceFS-开源分布式文件系统入门(一篇就够了)(上)
JuiceFS-开源分布式文件系统入门(一篇就够了)(上)
443 0
|
SQL 分布式计算 关系型数据库
Sqoop入门(一篇就够了)
Sqoop入门(一篇就够了)
5353 1
Sqoop入门(一篇就够了)
|
11月前
|
分布式计算 资源调度 Java
MapReduce入门(一篇就够了)(下)
MapReduce入门(一篇就够了)(下)
107 0
|
11月前
|
分布式计算 Hadoop 关系型数据库
Sqoop入门(一篇就够了)(上)
Sqoop入门(一篇就够了)(上)
267 0
|
11月前
|
SQL JSON 分布式计算
Spark入门(一篇就够了)(二)
Spark入门(一篇就够了)(二)
103 0