Spark分布式计算引擎的应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

什么是分布式计算

基本概念

和集中式计算相反,分布式计算的一个计算过程将会在多台机器上进行。组件之间彼此进行交互以实现一个共同的目标,把需要进行大量计算的工程数据分区成小块,由多台计算机分别计算,再上传运算结果后,将结果统一合并得出数据结论。

简单说就是1个人干活和100个人干活的区别。

分布式计算是一门计算机科学的研究课题,涉及到许多分支技术(CS模型、集群技术、通用型分布式计算环境等)。
以下仅涉及其中一部分内容:从分布式计算的理论基础中实现,并且已经得到了大规模生产环境验证的计算框架。

如何实现

要实现分布式计算首先要解决其中两个最重要的问题:

  • 1.如何拆分计算逻辑
  • 2.如何分发计算逻辑

拆分逻辑

计算逻辑要实现分布式,就必须要解决:如何将一个巨大的问题拆分成相对独立的子问题分发到各个机器上求解。

在哪里发生计算的角度来看,所有的计算逻辑都能够划分为这两种类型:

  • 1.能够分发到各个节点上并行执行的
  • 2.需要经过一定量的结果合并之后才能继续执行的

一旦考虑到这一步,问题会变得非常复杂,并不是像说的通过网络获取数据这么简单,各个节点的中间结果往哪存、怎么存,聚合节点什么时间可以开始通过网络拉数据,网络延迟、中断这种情况如何处理,两者之间需要建立一种可靠的通讯机制以保证准确无误地完成计算任务。

为了使整个求解过程完美的衔接起来,你需要解决一系列的通信、容灾、任务调度等问题。

首先对此公开提出解决方案的是Google的MapReduce。

其将一个分布式任务定义为两种类型的Job组成:

  • 1.Map Job对应的就是可以在各个节点上一起执行互不影响的逻辑
  • 2.Reduce Job处理的则是Map产生的中间结果(如果有)

Map和Reduce之间通过一个Shuffle过程来链接。

组成一个完整的分布式计算流程所有的细节问题都将在这个过程中得到有效的处理。

分发逻辑

计算逻辑拆分的问题解决之后,面临的将是如何将拆开的逻辑分发出去。
这里将是分布式计算和集中式计算最大的不同点,移动计算逻辑而不移动数据。

对于Map类型的任务其实可以归为分布式存储系统的问题。
因为基本上所有分布式计算框架都是基于优先数据本地化进行的。
也就是说数据存在哪,计算就分发到哪。

分布式存储系统中海量数据的分发方案:

  • 1.数据量分布
  • 2.数据范围分布
  • 3.Hash分片/消息队列
  • 4.一致性Hash

对于Reduce类型的任务分发的重点取决于之前的Map和Shuffle过程。

mapreduce

发展历程

当前热门的计算引擎都能归类到Hadoop的生态圈中,是08年以来一直都很热门的技术栈。

Hadoop的故事起源于一个搜索引擎公司和一个搜索引擎 -- Google and Nutch。

Nutch包含了商业化的搜索引擎的基本核心功能,包括爬虫系统、文本检索和大规模数据存储、计算。
但是最开始的Nutch是没有大规模数据存储和计算对应的解决方案,直到04年前后,Google发布了《The Google File System》、《MapReduce: Simplified Data Processing on Large Clusters 》和《Bigtable: A Distributed Storage System for Structured Data》,Nutch的作者看了之后用Java实现了Hadoop的三个核心组件:HDFS、MapReduce和HBase分别对应三篇论文阐述的理论思想。

Hadoop最开始是作为Nutch的一个子项目存在的,解决的问题就是搜索引擎对应的大规模数据存储、计算,直到被Yahoo孵化为一个完整的分布式系统解决方案并在08年之后全球广泛应用。

Hadoop作为一个分布式系统,也是由两部分组成的:分布式存储和分布式计算。

MapReduce对应的就是最初Hadoop分布式计算的解决方案。

Hadoop发展到现在,可以集成在其中的计算框架非常多,从应用的角度讲大体上可以分为两种:离线和实时。
有些开发者将这些计算引擎按照出生年代、计算模型归了个类:

第一代:无实时计算能力

MapReduce

第二代:基于MR之上的工具与实时计算能力

tez

Tez:基于MapReduce的DAG执行工具。

storm

Storm:以拓扑结构的方式提供实时计算能力。

第三代:完整的技术栈与更加稳定的系统

spark

Spark:提供更高效、更完整的技术栈。

heron

Heron:比Storm更加稳定的实时系统。

第四代:更为先进的计算思想与框架的统一

flink

Flink:最接近DataFlow数据流思想的解决方案。

beam

Beam:多种计算框架的兼容与支持。

Spark技术细节

逻辑执行图

job_logic_plan

RDD是Spark中的核心概念,直译过来叫做分布式数据集,你可以把它当做一个List,但是这个List里面的元素是分布在不同机器上的,对List的所有操作都将被分发到不同的机器上执行。

两种产生方式

  • 从外部数据源中创建
  • 从一个RDD中转换而来

一个Spark程序可以看做是一个或者多个RDD的完整生命周期,从诞生到发展,到变换,再到输出之后销毁。

两种操作行为

  • Transformation
  • Action

可以简单的理解成对应Map和Reduce阶段。

在RDD上进行的Transformation操作都是惰性执行的,意思就是只有数据真正用到的时候才会进行Transformation操作(这里指的是Action算子)。

这么做的原因要归咎到RDD的计算模型,其实也就是逻辑执行图。

从图中可以看到,当RDD中出现Action操作的时候,Spark将会生成一个Job,并根据RDD的依赖关系画出一张逻辑执行图,注意图中RDD之间的连线,这就是RDD之间的依赖关系。

两种依赖关系

  • 完全依赖:分区中的所有数据都来自父RDD分区中的所有数据
  • 部分依赖

RDD的整个生命周期将会被描述为一张逻辑执行图,包含了一个完整的血缘关系图,费劲心机画出了逻辑图之后再划分物理图时将会有最关键的作用。

RDD的三个两种:

  • 两种产生方式

    • 从外部数据源中创建
    • 从一个RDD中转换而来
  • 两种操作行为

    • Transformation
    • Action
  • 两种依赖关系

    • 完全依赖
    • 部分依赖

物理执行图

job_physics_plan

从RDD上得到逻辑执行图之后,Spark将会划分逻辑图从而生成物理执行图,表现形式为DAG有向无环图,RDD的执行模型将根据物理图的划分而展开。

基于逻辑图可以做的事情:

1、划分Stage
2、执行Pipeline
3、建立回溯机制

根据RDD之间的依赖关系来划分Stage解决了问题:

1、实现Pipeline,不需要保留中间计算结果
2、计算保持高效,Task分布均衡

由于RDD之间的依赖关系被明显的划分为了两种:

  • 对于完全依赖,他是可以完全不管其他RDD或者其他分区的执行进度,直接一条走到底的。
  • 对于部分依赖,他需要父RDD不同分区中的数据,所以他一定是等到所有父RDD计算完毕之后才会执行的。

基于逻辑执行图,我们可以明显的划分出Stage,将部分依赖易到切开,分为两个独立的Stage,由于Stage之间只有完全依赖,它可以毫无顾忌的建立回溯机制,当一个分区数据计算失败或者丢失,可以直接从父RDD对应的分区中恢复,而不是重新计算整个父RDD
基于Stage的独立性,Spark实现了Pipeline的计算方式。

如果所有操作都是立即执行的话那么处理流程应该是这样子的:

list1 = readAllFromHDFS
list2 = list1.map
list3 = list2.map
list4 = list3.filter

每个步骤都需要将全量的数据集加载到内存中操作这是毋庸置疑的。

作为和之前计算过程的对比:

data = readOneLineFromHDFS
data.map.map.filter

数据是作为流一条条从管道的开始一路走到结束。
最为直观的好处就是:不需要加载全量数据集,上一次的计算结果可以马上丢弃。
全量数据集其实是一个很恐怖的东西,全世界都在避免它,所以某种意义上来看,如果没有Shuffle过程,Spark所需要内存其实非常小,一条数据又能占多大空间。
第二,如果不是Pipeline的方式,而是马上触发全量操作,势必需要一个中间容器来保存结果,其实这里就又回到MapReduce的老路,效率很低。

综上,基于逻辑执行图能做的事情有:
1、划分Stage
2、执行Pipeline
3、建立回溯机制

其实这里已经包括了物理图的核心内容:划分Stage和执行Pipeline。

那么如果将整个逻辑图都归为一个Stage岂不是很好?一个Job只包含一个Stage,数据一路从头走到尾,什么中间结果都不需要保存
这么想是很美好,如果RDD之间都是完全依赖的话这是最完美的场景,但是实际case中会包含大量的Shuffle操作符,如果将整张图作为一个Stage,第一个Task将会处理所有分区的数据,也就是某种意义上的单机单线程,因为如果是多个Task的话没有办法各自感知Shuffle过程中所需要的数据状态。

相反如果将每个RDD操作都划分为一个Stage的话就走了MapReduce的老路。

根据RDD之间的依赖关系来划分Stage解决了问题:
1、实现PipeLine,不需要保留中间计算结果
2、计算保持高效,Task分布均衡

Shuffle过程

shuffle

和之前看到的MapReduce Shuffle过程相对比,在高级别上来看别没有多大区别,都是将Mapper中的数据进行Partition之后送到不同的Reducer中,reducer以内存为缓存边拉取数据边计算。

但是在具体实现的低级别角度上两者区别还是比较大的,MR阶段划分明显,Spark中没有明显的划分。
MR中的Mapper即为Spark中的ShuffleMapTask,而Reduce对应的可能是ShuffleMapTask或者ResultTask。
Spark各个阶段通过RDD的算子体现出来。

具体Shuffle过程可以分为:

1、Shuffle Write
2、Shuffle Read

Write过程其实很简单,根据之前划分的Stage,每个Stage的final task的结果将会写磁盘,和mr一样,有多少个分区数就会写多少个文件,后续的Stage将会通过网络来fatch各自对应的数据文件,这俩张图是不同模式下写磁盘文件的过程。

Read过程需要解决几个问题:

1、什么时候fetch数据:依赖的stage中所有ShuffleMapTask都执行完之后才进行fetch,迎合pipe line的思想。
2、如何获得数据位置:ShuffleMapTask结束之后都会想Driver端汇报数据存放位置,ResultTaskfetch数据时都会向Driver查询需要fetch的数据在哪里,Driver端有比较复杂的实现机制。
3、fetch的数据怎么存:刚fetch过来的数据存放在softBuffer中,计算之后的数据可以根据策略选择存放在内存或者内存+磁盘中。

和fetch过程的计算和mr也不一样:
1、Spark:边fetch边计算,因为是无序的,所有没有必要要求所有数据都获取之后才进行计算。
2、MR:MR中强制要求数据有序之后才进行reduce操作,所以MR是一次性fetch所有数据之后才计算。

与MapReduce相比:

  • Height Level:无太大区别,将mapper中的数据进行partition之后送到不同的reducer中。
  • Low Level:实现差别较大,MR阶段划分明显,Spark中没有明显的划分。

内存管理

memory_manage

Spark是用Scala开发完成的,也是一个运行在JVM体系上的系统性框架,所以Spark的内存模型也是基于Java虚拟机来的
基本模型就是:堆、栈、静态代码块和全局空间,在虚拟机的内存模型上Spark将内存做了二次划分。

主要为三个模块:

1、Storage:RDD缓存使用的空间
2、Shuffle:Shuffle过程使用的内存
3、Execution:加载代码执行计算使用的内存

除了虚拟机直接使用的内存之外,Spark还会直接在操作系统中开辟内存空间作为堆外内存来使用,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。

实际使用中需要根据数据集本身的特性和分布情况来调整三个内存模块各自占据的大小。

Spark实践案例

评分卡

目标:1-2小时内出结果。

第一版:7h

原有单机版Python适配出来的程序,效果甚微,执行完毕预计:7小时。

里面存在大量的collect操作,而且是循环中套循环的collect,一次collect将会把所有数据啦拉到driver节点统一处理。

第二版:3h

原有单机暴力适配的基础上,将函数中非常耗时的操作转换、提取和复用,最后只留下了一个Shuffle操作符。
测试表现良好,单字段平均22s,然而600个字段的情况下整个程序跑完大概需要600*20/3600=3h,仍然无法满足要求。

分析

评分卡中任意字段的处理是独立的、互不影响的,能否像数据清洗的处理过程一样,将独立的过程分发出去并行执行:

  • 数据清洗:每个人之间的计算互不影响
  • 评分卡:每个字段之间的计算互不影响

评分卡字段数据Demo:

table1_jpeg

评分卡中的数据虽然逻辑上每列互相独立,但是在物理存储上,按行存储,所有字段的数据形成一个完成的分布式数据集,计算每个字段时都需要对整个数据集进行操作。

优化

1、物理结构上按行存储

parquet

第三版:1.5h

计算每一个字段时,只读取其需要的那一列数据,在数据源头加到的数据量将会减少到5、6百分之一。

2、计算每个字段时都需要对整个数据集进行操作

通过进程的方式实现作业级别的并行,起多个相同的Spark作业去处理不同的字段

第四版:10m

使用进程的缺点:

  • 资源浪费
  • 不可控,系统化困难
  • 中间结果获取复杂

使用线程级别的并行,通过Driver端开启多线程处理不同的字段。

第五版:3m

线程之间共享全局内存,可以异步获得各线程的执行结果并聚合,保存在程序内存变量中直接衔接下一阶段。

评分卡历程:

  • 样本->全量
  • 7小时->3小时->1.5小时->10分钟->3分钟之内

附录

常见的Transformation操作

transformation

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
165 1
|
19天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
2月前
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决了 Session 共享问题。其特性包括:API 及实现用于管理用户会话、以应用容器中性方式替换 HttpSession、简化集群会话支持、管理单个浏览器实例中的多个用户会话以及通过 headers 提供会话 ID 以使用 RESTful API。Spring Session 通过 SessionRepositoryFilter 实现,拦截请求并转换 request 和 response 对象,从而实现 Session 的创建与管理。
分布式session-SpringSession的应用
|
1月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
98 0
|
2月前
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决 Session 共享问题。其主要特性包括:提供 API 和实现来管理用户会话,以中立方式替换应用程序容器中的 HttpSession,简化集群会话支持,并在单个浏览器实例中管理多个用户会话。此外,Spring Session 允许通过 headers 提供会话 ID 以使用 RESTful API。结合 Spring Boot 使用时,可通过配置 Redis 依赖和支持缓存的依赖实现 Session 共享。
分布式session-SpringSession的应用
|
1月前
|
缓存 网络协议 API
分布式系统应用之服务发现!
分布式系统应用之服务发现!
|
2月前
|
存储 运维 应用服务中间件
阿里云分布式存储应用示例
通过阿里云EDAS,您可以轻松部署与管理微服务应用。创建应用时,使用`CreateApplication`接口基于模板生成新应用,并获得包含应用ID在内的成功响应。随后,利用`DeployApplication`接口将应用部署至云端,返回"Success"确认部署成功。当业务调整需下线应用时,调用`ReleaseApplication`接口释放资源。阿里云EDAS简化了应用全生命周期管理,提升了运维效率与可靠性。[相关链接]提供了详细的操作与返回参数说明。
|
2月前
|
Dubbo Java 应用服务中间件
分布式(基础)-RMI简单的应用
分布式(基础)-RMI简单的应用