开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink on YARN 流程之Flink Cluster 启动流程是什么?

Flink on YARN 流程之Flink Cluster 启动流程是什么?

展开
收起
Lee_tianbai 2020-12-30 13:11:55 1025 0
1 条回答
写回答
取消 提交回答
    1. YARN RM 中的 ClientRMService(为普通用户提供的 RPC 服务组件, 处理来自客户端的各种 RPC 请求,比如查询 YARN 集群信息,提交、终 止应用等)接收到应用提交请求,简单校验后将请求转交给 RMAppManager(YARN RM 内部管理应用生命周期的组件);
    2. RMAppManager 根据应用提交上下文内容创建初始状态为 NEW 的应 用,将应用状态持久化到 RM 状态存储服务(例如 ZooKeeper 集群,RM 状态存储服务用来保证 RM 重启、HA 切换或发生故障后集群应用能够正 常恢复,后续流程中的涉及状态存储时不再赘述),应用状态变为 NEW_ SAVING;
    3. 应用状态存储完成后,应用状态变为 SUBMITTED;RMAppManager 开始向 ResourceScheduler(YARN RM 可拔插资源调度器,YARN 自 带 三 种 调 度 器 FifoScheduler/FairScheduler/CapacityScheduler,其 中 CapacityScheduler 支持功能最多使用最广泛,FifoScheduler 功能最简单 基本不可用,今年社区已明确不再继续支持 FairScheduler,建议已有用户 迁至 CapacityScheduler)提交应用,如果无法正常提交(例如队列不存在、 不是叶子队列、队列已停用、超出队列最大应用数限制等)则抛出拒绝该应 用,应用状态先变为 FINAL_SAVING 触发应用状态存储流程并在完成后变 为 FAILED;如果提交成功,应用状态变为 ACCEPTED;
    4. 开始创建应用运行实例 (ApplicationAttempt,由于一次运行实例中最重要 的组件是 ApplicationMaster,下文简称 AM,它的状态代表了 ApplicationAttempt 的当前状态,所以 ApplicationAttempt 实际也代表了 AM), 初始状态为 NEW;
    5. 初始化应用运行实例信息,并向 ApplicationMasterService(AM&RM 协 议接口服务,处理来自 AM 的请求,主要包括注册和心跳)注册,应用实例 状态变为 SUBMITTED;
    6. RMAppManager 维护的应用实例开始初始化 AM 资源申请信息并重新校 验队列,然后向 ResourceScheduler 申请 AM Container(Container 是 YARN 中资源的抽象,包含了内存、CPU 等多维度资源),应用实例状态变 为 ACCEPTED;
    7. ResourceScheduler 会根据优先级(队列 / 应用 / 请求每个维度都有优 先级配置)从根队列开始层层递进,先后选择当前优先级最高的子队列、应 用直至具体某个请求,然后结合集群资源分布等情况作出分配决策,AM Container 分配成功后,应用实例状态变为 ALLOCATED_SAVING,并触 发应用实例状态存储流程,存储成功后应用实例状态变为 ALLOCATED;
    8. RMAppManager 维护的应用实例开始通知 ApplicationMasterLauncher (AM 生命周期管理服务,负责启动或清理 AM container)启动 AM container,ApplicationMasterLauncher 与 YARN NodeManager(下 文 简 称 YARN NM,与 YARN RM 保持通信,负责管理单个节点上的全部资源、 Container 生命周期、附属服务等,监控节点健康状况和 Container 资源使 用)建立通信并请求启动 AM container;
    9. ContainerManager(YARN NM 核心组件,管理所有 Container 的生命 周期)接收到 AM container 启动请求,YARN NM 开始校验 Container Token 及资源文件,创建应用实例和 Container 实例并存储至本地,结果 返回后应用实例状态变为 LAUNCHED;
    10. ResourceLocalizationService(资源本地化服务,负责 Container 所需 资源的本地化。它能够按照描述从 HDFS 上下载 Container 所需的文件资 源,并尽量将它们分摊到各个磁盘上以防止出现访问热点)初始化各种服务 组件、创建工作目录、从 HDFS 下载运行所需的各种资源至 Container 工 作 目 录(路 径 为 : ${yarn.nodemanager.local-dirs}/usercache/${user}/ appcache//);
    11. ContainersLauncher(负 责 container 的 具 体 操 作, 包 括 启 动、 重 启、恢复和清理等)将待运行 Container 所需的环境变量和运行命令写到 Container 工作目录下的 launch_container.sh 脚本中,然后运行该脚本 启动 Container;
    12. Container 进程加载并运行 ClusterEntrypoint(Flink JobManager 入口 类,每种集群部署模式和应用运行模式都有相应的实现,例如在 YARN 集群 部署模式下,per-job 应用运行模式实现类是 YarnJobClusterEntrypoint, session 应用运行模式实现类是 YarnSessionClusterEntrypoint),首先初 始化相关运行环境: ● 输出各软件版本及运行环境信息、命令行参数项、classpath 等信息; ● 注册处理各种 SIGNAL 的 handler: 记录到日志 ● 注册 JVM 关闭保障的 shutdown hook:避免 JVM 退出时被其他 shutdown hook 阻塞 ● 打印 YARN 运行环境信息:用户名 ● 从运行目录中加载 flink conf ● 初始化文件系统 ● 创 建 并 启 动 各 类 内 部 服 务(包 括 RpcService、HAService、BlobServer、 HeartbeatServices、MetricRegistry、ExecutionGraphStore 等) ● 将 RPC address 和 port 更新到 flink conf 配置
    13. 启 动 ResourceManager(Flink 资 源 管 理 核 心 组 件, 包 含 YarnResourceManager 和 SlotManager 两个子组件,YarnResourceManager 负责外部资源管理,与 YARN RM 建立通信并保持心跳,申请或释放 TaskManager 资源,注销应用等;SlotManager 则负责内部资源管理,维 护全部 Slot 信息和状态)及相关服务,创建异步 AMRMClient,开始注册 AM,注册成功后每隔一段时间(心跳间隔配置项:${yarn.heartbeat.interval},默认 5s)向 YARN RM 发送心跳来发送资源更新请求和接受资源变更 结果。YARN RM 内部该应用和应用运行实例的状态都变为 RUNNING,并通知 AMLivelinessMonitor 服务监控 AM 是否存活状态,当心跳超过一 定时间(默认 10 分钟)触发 AM failover 流程;
    14. 启动 Dispatcher(负责接收用户提供的作业,并且负责为这个新提交的 作业拉起一个新的 JobManager)及相关服务(包括 REST endpoint 等), 在 per-job 运行模式下,Dispatcher 将直接从 Container 工作目录加载 JobGraph 文件;在 session 运行模式下,Dispatcher 将在接收客户端提 交的 Job(_ 通过 BlockServer 接收 job graph 文件)后再进行后续流程;
    15. 根据 JobGraph 启动 JobManager(负责作业调度、管理 Job 和 Task 的生命周期),构建 ExecutionGraph(JobGraph 的并行化版本,调度层最 核心的数据结构);
    16. JobManager 开始执行 ExecutionGraph,向 ResourceManager 申请 资源;
    17. ResourceManager 将资源请求加入等待请求队列,并通过心跳向 YARN RM 申请新的 Container 资源来启动 TaskManager 进程;后续流程如果有 空闲 Slot 资源,SlotManager 将其分配给等待请求队列中匹配的请求,不 用再通过 18. YarnResourceManager 申请新的 TaskManager;
    18. YARN ApplicationMasterService 接收到资源请求后,解析出新的资源 请求并更新应用请求信息;
    19. YARN ResourceScheduler 成功为该应用分配资源后更新应用信息, ApplicationMasterService 接收到 Flink JobManager 的下一次心跳时返 回新分配资源信息;
    20. Flink ResourceManager 接收到新分配的 Container 资源后,准备好 TaskManager 启 动 上 下 文(ContainerLauncherContext, 生 成 TaskManager 配置并上传至分布式存储,配置其他依赖和环境变量等),然后向 YARN NM 申请启动 TaskManager 进程,YARN NM 启动 Container 的 流程与 AM Container 启动流程基本类似,区别在于应用实例在 NM 上已存 在并未 RUNNING 状态时则跳过应用实例初始化流程;
    21. TaskManager 进 程 加 载 并 运 行 YarnTaskExecutorRunner(Flink TaskManager 入口类),初始化流程完成后启动 TaskExecutor(负责执行 Task 相关操作);
    22. TaskExecutor 启 动 后 先 向 ResourceManager 注 册,成功后再向 SlotManager 汇报自己的 Slot 资源与状态;SlotManager 接收到 Slot 空 闲资源后主动触发 Slot 分配,从等待请求队列中选出合适的资源请求后,向 TaskManager 请求该 Slot 资源
    23. TaskManager 收到请求后检查该 Slot 是否可分配(不存在则返回异常信 息)、Job 是否已注册(没有则先注册再分配 Slot),检查通过后将 Slot 分配 给 JobManager;
    24. JobManager 检查 Slot 分配是否重复,通过后通知 Execution 执行部署 task 流程,向 TaskExecutor 提交 task;TaskExecutor 启动新的线程运行 Task。
    2020-12-30 13:14:12
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载