大数据YARN概述

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据YARN概述

1 YARN 架构概述

1.1 原MapReduce框架的不足

  1. JobTracker是集群事务的集中处理点,存在单点故障
  2. JobTracker需要完成的任务太多,既要维护job资源调度的状态又要维护job的task的状态,造成过多的资源消耗
  3. 在taskTracker端,用map/reduce task作为资源的表示过于简单,没有考虑到cpu、内存等资源情况,当把两个需要消耗大内存的task调度到一起,很容易出现OOM
  4. 把资源强制划分为map/reduce slot,当只有map task时,reduce slot不能用;当只有reduce task时,map slot不能用,容易造成资源利用不足

Hadoop里有 两种slots, map slots和reduce slots,map task使用map slots,一一对应,reduce task使用reduce slots。注: 现在越来越多的观点认为应该打破map slots与 reduce slots的界限,应该被视为统一的资源池,they are all resource,从而提高资源的利用率。区分map slots和reduce slots,容易导致某一种资源紧张,而另一个资源却有空闲。在Hadoop的下一代框架MapR中,已经取消了map slots与reduce slots的概念,并将Jobtracker的功能一分为二,用ResourceManager来管理节点资源,用ApplicationMaster来监控与调度作业。ApplicationMaster是每个Application都有一个单独的实例,application是用户提交的一组任务,它可以是一个或多个job的任务组成。Yet Another Resource Negotiator 简称 YARN ,另一种资源协调者,是 Hadoop 的资源管理器。


Yarn的基本思想是将JobTraCker 的资源管理和作业调度/监控两大主要职能拆分为两个独立的进程:一个全局的 Resource Manager ,以及与每个应用对应的 Application Master ( AM )。 Resource Manager 和每个节点上的 Node Manager ( NM )组成了全新的通用操作系统,以分布式的方式管理应用程序。图给出了Yarn的系统架构示意。

Resource Manager 拥有为系统中所有的应用分配资源的决定权.与之相关的是应用的 Application Master ,负责与 Resource Manager 协商资源,并与 Node Managert 办同医工作来执行和监控任务. Resource Manager 有一个可插拔的调度器组件 Scheduler ,它负责为运行中的各种应用分配资源,分配时会受到容量、队列及其他因素的制约。 Scheduler 是一个纯粹的调度器,不负责应用的监控和状态跟踪,也不保证在应用失败或者硬件失败的情况下对任务的重启。 Scheduler 基于应用的资源需求来执行其调度功能,使用了称为资源容器 ( Container )的抽象概念,其中包括了多种资源维度,如内存、 cpu 、磁盘及网络。 Node Manager 是与每台机器对应的从属进程( Slave ) ,负责启动应用的 Container , 监控资源使用情况(如 CPU 、内存、磁盘和网络),并且报告给 Resource Manager 。每个应用的 Application Master 负责与 Scheduler 协商合适的 Container ,跟踪应用的状态,以及监控它们的进度。从系统的角度讲, Application Master 也是以一个普通 Container 的身份运行的。在新的Yarn系统下, MapReduce 的一个关键思想是,确保与现有 MapReduce 应用和用户兼容,也就是重用现有的 MapReduce 框架。

1.2 Yarn的ApplicationMaster介绍

ApplicationMaster实际上是特定计算框架的一个实例,每种计算框架都有自己独特的ApplicationMaster,负责与ResourceManager协商资源,并和NodeManager协同来执行和监控Container。MapReduce只是可以运行在YARN上一种计算框架。


ApplicationMaster职能


(1) 初始化向ResourceManager报告自己的活跃信息的进程


(2) 计算应用程序的的资源需求。


(3) 将需求转换为YARN调度器可以理解的ResourceRequest。


(4) 与调度器协商申请资源


(5) 与NodeManager协同合作使用分配的Container。


(6) 跟踪正在运行的Container状态,监控它的运行。


(7) 对Container或者节点失败的情况进行处理,在必要的情况下重新申请资源。

2 HDFS、YARN、MapReduce 三者关系

HDFS:资源存储最右边的三个Node

YARN:资源调度,RM资源管理去调用容器

MapReduce:容器去检索,然后写入到HDFS,先Map然后Reduce

2.1 Yarn 的工作流程

Yam 比经典的 MapReduce 包括更多的实体。总体上讲, Yarn 的工作流程包括以下 5 个步骤:

(1) 客户端提交 MapReduce 任务。

(2) Yarn 的 Resource Manager 负责协调集群上计算资源的分配。

(3) Yarn 的 Node Manager 负责启动和监控集群中 Container 。

(4) Application Master 负责协调运行 MapReduce 任务,它和 MapReduce 任务在Container 中运行,这些 Container 由 Resource Manager 分配,对 Node Manager 进行管理。

(5) 分布式文件系统 (HDFS) 用来与其他实体间共享作业文件。

3 HADOOP之YARN详解

前面我们学习了Hadoop中的MapReduce,我们知道MapReduce任务是需要在YARN中执行的,那下面

我们就来学习一下Hadoop中的YARN

YARN的由来 从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期作为一个公共的资源管理平台,任何满足规则的计算引擎都可以在它上面执行。所以YARN可以实现HADOOP集群的资源共享,不仅仅可以跑MapRedcue,还可以跑Spark、Flink。 YARN架构分析咱们之前部署Hadoop集群的时候也对YARN的架构有了基本的了解YARN主要负责集群资源的管理和调度 ,支持主从架构,主节点最多可以有2个,从节点可以有多个其中:


ResourceManager:是主节点,主要负责集群资源的分配和管理

NodeManager:是从节点,主要负责当前机器资源管理


YARN资源管理模型

YARN主要管理内存和CPU这两种资源类型

当NodeManager节点启动的时候自动向ResourceManager注册,将当前节点上的可用CPU信息和内存

信息注册上去。这样所有的nodemanager注册完成以后,resourcemanager就知道目前集群的资源总量了。那我们现在来看一下我这个一主两从的集群资源是什么样子的,打开yarn的8088界面

注意,这里面显示的资源是集群中所有从节点的资源总和,不包括主节点的资源,

那我们再详细看一下每一个从节点的资源信息

但是这个数值是对不上的,我的linux机器每台只给它分配了2G的内存

通过free -m可以看到

[root@bigdata02 ~]# free -m
 total used free shared buff/cache availab
Mem: 1819 372 1133 9 313 12
Swap: 2047 0 2047

CPU只分配了1个

通过top命令可以看到

top - 11:30:35 up 1:05, 1 user, load average: 0.00, 0.08, 0.10
Tasks: 99 total, 1 running, 98 sleeping, 0 stopped, 0 zombie
%Cpu0 : 0.3 us, 0.3 sy, 0.0 ni, 99.3 id, 0.0 wa, 0.0 hi, 0.0 si

那为什么在这里显示是内存是8G,CPU是8个呢?

不要着急,我们先喝杯咖啡,看一下下面这2个参数


yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024, 即8G

yarn.nodemanager.resource.cpu-vcores:单节点可分配的虚拟CPU个数,默认是8 看到没有,这都是默认单节点的内存和CPU信息,就算你这个机器没有这么多资源,但是在yarndefault.xml中有这些默认资源的配置,这样当nodemanager去上报资源的时候就会读取这两个参数的 值,这也就是为什么我们在前面看到了单节点都是8G内存和8个cpu,其实我们的linux机器是没有这么大 资源的,那你这就是虚标啊,肯定不能这样干,你实际有多少就是多少,所以我们可以修改这些参数的 值,修改的话就在yarn-site.xml中进行配置即可,改完之后就可以看到真实的信息了,在这我就先不改

了,针对我们的学习环境不影响使用,修改的意义不大,你知道这回事就行了。


YARN中的调度器

接下来我们来详细分析一下YARN中的调度器,这个是非常实用的东西,面试的时候也会经常问到。

大家可以想象一个场景,我们集群的资源是有限的,在实际工作中会有很多人向集群中提交任务,那这时


候资源如何分配呢?

如果你提交了一个很占资源的任务,这一个任务就把集群中90%的资源都占用了,后面别人再提交任务,剩下的资源就不够用了,这个时候怎么办?

让他们等你的任务执行完了再执行?还是说你把你的资源匀出来一些分给他,你少占用一些,让他也能慢具体如何去做这个是由YARN中的调度器负责的

4 YARN中支持三种调度器

yarn默认使用的是最简单的FIFO调度器,即一个default队列

1:FIFO Scheduler:先进先出(first in, first out)调度策略

2:Capacity Scheduler:FIFO Scheduler的多队列版本

3:FairScheduler(公平调度管理):多队列,多用户共享资源, 前提:假设任务具有相同优先级,平均分配资源

下面来看图分析一下这三种调度器的特性

4.1 FIFO Scheduler

是先进先出的,大家都是排队的,如果你的任务申请不到足够的资源,那你就等着,等前面的任务执行结束释放了资源之后你再执行。这种在有些时候是不合理的,因为我们有一些

任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。


4.2 CapacityScheduler

它是FifoScheduler的多队列版本,就是我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务,queueB中运行优先级比较高的任务。这两个队列的资源是相互对立的

但是注意一点,队列内部还是按照先进先出的规则。


4.3 FairScheduler

支持多个队列,每个队列可以配置一定的资源,每个队列中的任务共享其所在队列

的所有资源,不需要排队等待资源具体是这样的,假设我们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当

你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用

在实际工作中我们一般都是使用第二种, CapacityScheduler ,从hadoop2开始, CapacitySchedule r也是集群中的默认调度器了

那下面我们到集群上看一下,点击左侧的Scheduler查看

20210712180858988.png

Capacity,这个是集群的调度器类型,

下面的root是根的意思,他下面目前只有一个队列,叫default,我们之前提交的任务都会进入到这个队

列中。

下面我们来修改一下,增加多个队列

5 案例:YARN多资源队列配置和使用

我们的需求是这样的,希望增加2个队列,一个是online队列,一个是offline队列

然后向offline队列中提交一个mapreduce任务

online队列里面运行实时任务

offline队列里面运行离线任务,我们现在学习的mapreduce就属于离线任务

实时任务我们后面会学习,等讲到了再具体分析。

这两个队列其实也是我们公司中最开始分配的队列,不过随着后期集群规模的扩大和业务需求的增加,后

期又增加了多个队列。

在这里我们先增加这2个队列,后期再增加多个也是一样的。

具体步骤如下:

修改集群中 etc/hadoop 目录下的 capacity-scheduler.xml 配置文件

修改和增加以下参数,针对已有的参数,修改value中的值,针对没有的参数,则直接增加

这里的 default 是需要保留的,增加 online,offline ,这三个队列的资源比例为 7:1:2

具体的比例需要根据实际的业务需求来,看你们那些类型的任务比较多,对应的队列中资源比例就调高一

些,我们现在暂时还没有online任务,所以我就把online队列的资源占比设置的小一些。

先修改bigdata01上的配置

[root@bigdata01 hadoop]# vi capacity-scheduler.xml
    <property>
        <name>yarn.scheduler.capacity.root.queues</name>
        <value>default,online,offline</value>
        <description>队列列表,多个队列之间使用逗号分割</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.default.capacity</name>
        <value>70</value>
        <description>default队列70%</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.online.capacity</name>
        <value>10</value>
        <description>online队列10%</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.offline.capacity</name>
        <value>20</value>
        <description>offline队列20%</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
        <value>70</value>
        <description>Default队列可使用的资源上限.</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
        <value>10</value>
        <description>online队列可使用的资源上限.</description>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
        <value>20</value>
        <description>offline队列可使用的资源上限.</description>
    </property>

修改好以后再同步到另外两个节点上

[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata02:/data/soft/
[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata03:/data/soft/

然后重启集群才能生效

[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh 
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh

进入yarn的web界面,查看最新的调度器队列信息

注意了,现在默认提交的任务还是会进入default的队列,如果希望向offline队列提交任务的话,需要指

定队列名称,不指定就进默认的队列

在这里我们还需要同步微调一下代码,否则我们指定的队列信息 代码是无法识别的

拷贝WordCountJob类,新的类名为 WordCountJobQueue

主要在job配置中增加一行代码

package com.oldlu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
 * 指定队列名称
 *
 * @author oldlu
 * @version 1.0
 * @date 2021/7/30 0030 14:16
 */
public class WordCountJobQueue {
    /**
     * Map阶段
     */
    Logger logger = LoggerFactory.getLogger(MyMapper.class);
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        //输出k1,v1的值
        //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
        //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
        //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
        //对获取到的每一行数据进行切割,把单词切割出来
        String[] words = v1.toString().split(" ");
        //迭代切割出来的单词数据
        for (String word : words) {
            //把迭代出来的单词封装成<k2,v2>的形式
            Text k2 = new Text(word);
            LongWritable v2 = new LongWritable(1L);
            //把<k2,v2>写出去
            context.write(k2,v2);
        }
    }
}
/**
 * Reduce阶段
 */
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongW
 Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
 * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
 * @param k2
 * @param v2s
 * @param context
 * @throws IOException
 * @throws InterruptedException
 */
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context co
        throws IOException, InterruptedException {
        //创建一个sum变量,保存v2s的和
        long sum = 0L;
        //对v2s中的数据进行累加求和
        for(LongWritable v2: v2s){
        //输出k2,v2的值
        //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
        //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
        sum += v2.get();
        }
        //组装k3,v3
        Text k3 = k2;
        LongWritable v3 = new LongWritable(sum);
        //输出k3,v3的值
        //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
        }
        }
/**
 * 组装Job=Map+Reduce
 */
public static void main(String[] args) {
        try{
        //指定Job需要的配置参数
        Configuration conf = new Configuration();
        //解析命令行中-D后面传递过来的参数,添加到conf中
        String[] remainingArgs = new GenericOptionsParser(conf, args).get
        //创建一个Job
        Job job = Job.getInstance(conf);
        //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个
        job.setJarByClass(WordCountJobQueue.class);
        //指定输入路径(可以是文件,也可以是目录)
        FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
        //指定输出路径(只能指定一个不存在的目录)
        FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
        //指定map相关的代码
        job.setMapperClass(MyMapper.class);
        //指定k2的类型
        job.setMapOutputKeyClass(Text.class);
        //指定v2的类型
        job.setMapOutputValueClass(LongWritable.class);
        //指定reduce相关的代码
        job.setReducerClass(MyReducer.class);
        //指定k3的类型
        job.setOutputKeyClass(Text.class);
        //指定v3的类型
        job.setOutputValueClass(LongWritable.class);
        //提交job
        job.waitForCompletion(true);
        }catch(Exception e){
        e.printStackTrace();
        }
    }
 }

重新编译打包,上传到服务器上面

执行任务

[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dep

如果我们去掉指定队列名称的配置,此时还会使用default队列

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r /outqueue
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dep

到yarn中查看任务的信息,显示是在default队列中执行

这就是YARN中调度器多资源队列的配置,在工作中我们只要掌握如何使用这些队列就可以了,具体如何配置是我们向运维同学提需求,他们去配置。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
1月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
79 0
|
1月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
74 5
|
1月前
|
资源调度 数据可视化 大数据
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
35 4
|
1月前
|
XML 分布式计算 资源调度
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
146 5
|
1月前
|
XML 资源调度 网络协议
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
84 4
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
72 4
|
1月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
26 1
|
1月前
|
存储 分布式计算 NoSQL
大数据-144 Apache Kudu 基本概述 数据模型 使用场景
大数据-144 Apache Kudu 基本概述 数据模型 使用场景
36 0
|
1月前
|
SQL 存储 OLAP
大数据-133 - ClickHouse 基础概述 全面了解
大数据-133 - ClickHouse 基础概述 全面了解
39 0