EMR 打造高效云原生数据分析引擎

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: EMR-Jindo是EMR推出的云原生 OLAP 引擎。凭借该引擎,EMR成为第一个云上TPC-DS成绩提交者。经过持续不断地内核优化,目前基于最新 EMR-Jindo 引擎的 TPC-DS 成绩又有了大幅提高,达到了3615071,成本降低到 0.76 CNY。在2019杭州云栖大会大数据技术专场,阿里云阿里巴巴计算平台事业部 EMR 技术专家辛庸向大家分享了如何基于开源体系如何打造云上数据分析平台E-MarReduce(EMR)、EMR-Jindo 引擎背后的相关技术以及以 EMR-Jindo 为核心的云上大数据架构方案。

本场视频链接:EMR打造高效云原生数据分析引擎

本场ppt材料:https://www.slidestalk.com/AliSpark/2019___0926_110365


基于开源体系打造云上数据分析平台

客户选择开源方案的原因主要有以下几点:

灵活多样的业务场景:目前即便是一个小企业,其数据存储也可能是多种多样的,比如业务数据、日志数据和图数据等,这种情况下,需要有一个高度定制化的系统来串联不同的业务场景;
自己有专业的运维能力:开源系统有充足的人才储备,丰富的网上资料与开源的强大后盾,可以确保公司业务的顺利开展;
多种业务需求vs成本压力:每种云上产品有自己的使用场景,对于中小企业来说购买多种云产品将会造成很大的成本压力,而通过开源体系维护一套系统,在集成用户业务中所需组件的同时,可以降低用户的成本。
image.png

下图是阿里巴巴EMR系统的产品架构图。用户上云的方式主要有两种,一种是购买ECS资源自己搭建一套开源系统;另一种是直接选择阿里巴巴的EMR系统。第一种方式由于开源系统组件多,涉及到了Spark、Hive、Flink和TensorFlow等,从零搭建一套完整的大数据系统对于用户来讲非常复杂,尤其是成百上千的集群规模也给运维造成了很大的挑战。而使用EMR系统具有以下优点:
1) 阿里云EMR系统可以帮助用户一键化自动部署、配置相关组件,开箱即用,同时还会根据用户的机器类型进行参数的自动推荐调优。
2) 阿里云EMR系统与阿里云其他产品实现了打通,比如数据存放在OSS,EMR系统无需额外再做认证配置,便可以很方便地读取OSS上的数据;
3) 阿里云EMR系统集成了很多自研插件,这些插件在其他产品中是没有的;
4) 阿里云EMR系统的所有组件兼容开源但优于开源,如Flink集成了阿里云自研的Blink和TensorFlow(PAI),这也是阿里云为社区做的一点贡献,目的是为了让用户能用到阿里云内部的技术;
5) 阿里云EMR系统提供了全平台的作业诊断与告警组件APM来实现自动化运维,大大降低集群运维的复杂性;
6) 阿里云EMR系统还与DataWorks对接,用户可以以DataWorks为入口,傻瓜式地使用EMR系统。

image.png

EMR系统的目标主要有以下三个:

平台化:将EMR做成一个统一的云上数据分析平台,帮助用户打造全栈式的大数据解决方案,支持全系列VM容器化,提供企业级HAS和大数据APM;
技术社区&深度:持续深耕技术社区,打造大数据友好的云 Native 存储,同时将技术回馈给社区,为社区做贡献;
生态:EMR系统将结合阿里云其他产品构建一个生态,接入Blink、PAI,集成OSS、OTS方案。

image.png

EMR-Jindo:云原生高效数据分析引擎

下图展示了TPC-DS的基准测试报告,可以发现在2019年3月份10TB的测试中,性能指标得分是182万左右,成本是0.31 USD;而2019年十月份同样的测试性能指标得分已经变成526万,成本下降到0.53 CNY,也就是说经过半年左右性能提升了2.9倍,成本缩减到原来的四分之一。同时阿里巴巴还成为了首个提交TPC-DS测试100TB测试报告的厂商。这些成绩的背后是EMR-Jindo引擎的支持。

image.png

EMR-Jindo引擎架构主要分为两部分:

Jindo-Spark:EMR内部全面优化的Spark高效计算引擎,可以处理多种计算任务;
Jindo-FS:自研的云原生存储引擎,兼容开源HDFS的接口,兼顾性能与价格。

image.png

1) Jindo-Spark

Jindo-Spark高效计算引擎对Spark采取了一系列优化措施,比如Runtime Filter支持自适应的运行时数据裁剪;Enhanced Join Reorder来解决外连接重排等问题;TopK支持推理并下推 TopK 逻辑,帮助尽早地过滤数据;File Index支持文件级别过滤和min/max/bloom/倒排等;自研开发了Relational Cache,实现使用一套引擎就可以将查询从分钟级提升为亚秒级;针对特定的场景推出Spark Transaction功能,为Spark引入Full ACID支持;实现了Smart Shuffle功能,从底层来减少sort-merge 次数,提升Shuffle的效率。

image.png

Runtime Filter
类似于Spark中的Dynamic Partition Pruning(DPP),但是其比DPP功能更强大。除了DPP能处理的分析表之外,Runtime Filter还可以处理非分析表。其基本原理是运行时动态裁剪数据,避免不必要的计算。比如,面对一个join查询,无法通过value下推到存储层而将数据过滤,逻辑推算的时候无法预知最后的数据量级。这种情况下如果是分析表,Runtime Filter首先会估计其中一个表中参与join操作的数据量,如果数据量较小,则提前进行数据筛选,再推送到另一侧做数据过滤;而对于非分析表,会引入Filter,如BloomFilter获得Min或Max的统计信息,

image.png

根据这些统计信息,将备选数据比较少的一侧提取出来,推到另一侧进行过滤。Runtime Filter的成本很小,只需要在优化器中进行简单评估,却可以带来显著的性能提升。如下图所示,Runtime Filter实现了35%左右的整体性能提升。该特性已经在Spark提交了PR(SPARK-27227)。
Enhanced Join Recorder
大家都知道,算子执行顺序可能会极大地影响sql的执行效率,这种情况下优化的核心原则是改变算子的执行顺序,尽早地过滤数据。

比如下图左上角的例子中,如果最底层两个表非常大的话,则这两张表join的开销会非常大,join后的大数据再去join小表,大数据一层一层地传递下去,就会影响整个流程的执行效率。此时,优化的思想是先将大表中一些无关的数据过滤掉,减少往下游传递的数据量。针对该问题,Spark使用的是动态规划算法,但其只适用于表的数量比较少的情况,如果表的数量大于12,该算法就束手无策。面对表的数量比较多的情况,EMR提供了多表join的遗传算法,其可以将原来的动态规划算法的2n的复杂度降到线性的量级,能完成成百上千张表的join。

下图右上角可以看到,Query64有18个表参与join,动态规划算法优化时间就需要耗费1400秒,而多表join的遗传算法仅需要20秒左右就可完成。Join Recorder另外一个重要的功能是外连接重排算法,大家都知道sql中外连接不能随意交换顺序的,但这并不代表不能交换顺序,比如A left join B, 然后再left join C,事实上在某种条件下其顺序是可交换的。在Spark中,外连接的优化是直接被放弃掉,而EMR则根据现有研究找到了顺序可交换的充分必要条件,实现了外连接重排算法(如下图左下角所示),对外连接的执行效率有了质的提升(下图右下角)
image.png

Relational Cache
Spark原本的Cache存在几个局限点,其一Spark的Cache是session级别,如果发现某一个Query的片段使用比较频繁,就会对为这个session创建一个cache,但是session结束后,cache就会消失;其二Spark的Cache是存储在本机上,而不是分布式存储,因此无法做到通用。在此基础上,EMR平台实现了Relational Cache,对任意Spark表,视图或者Dataset等关系型数据抽象的数据实体都创建cache, 类似于物化视图(Materialized View),但是比物化视图功能要丰富。Relational Cache的使用场景包括a)亚秒级响应MOLAP引擎;b)交互式BI,Dashboard;c)数据同步;d)数据预组织。

image.png

Relational Cache的创建过程如下,其语法与Spark sql常见的DDL类似。首先CACHE一个表或视图,然后指定Relational Cache的更新策略(DEMAND或COMMIT)、是否用于后续优化、Cache数据的存储方式以及Cache的视图逻辑。Relational Cache支持cache任意Table、View,支持cache到内存、HDFS、OSS等任意数据源,JSON、ORC、Parquet等任意数据格式。

image.png

Relational Cache还支持对用户输入的sql的优化。原来的Spark sql Cache对于用户输入的sql优化非常僵硬死板,用户输入的sql必须精确匹配上Cache才能使用。而Relational Cache则完全不同,如果有a、b、c、d四个表join的cache,当又有a、b、e三个表join的情况下,a、b join的结果便可以从四个表join时生成的Cache数据中读取。下图中右侧展示了Cache和没有Cache的基准测试结果,可以看出Relational Cache可以保证测试的响应时间在亚秒级。
请参考 Spark Relational Cache实现亚秒级响应的交互式分析

image.png

Spark Transaction:有些用户可能会使用Hive表,Hive表有事务支持,然而Spark在事务这一项上是不兼容Hive的。因此,为了满足用户数据订正/删除以及数据流导入的场景支持,EMR平台提供了Spark Transaction支持事务的ACID支持。

传统的数据导入是分批的,比如一天一导入,而流数据导入场景下数据是实时写入的原始数据,并未经过任何处理,因此会有delete和update的需求。Spark Transaction整体来讲是一种锁+MVCC的实现形式,MVCC与底层的存储密不可分。大数据在Hive和Spark兼容的情况下,都是文件的形式存在目录中,文件的版本通过行来控制,写入的每一行都会加上Meta Columns,如op、original_write-id、bucket id和row_id等,来标识这是全表唯一的一行。当需要更新某一行的时候,并不会原地更新该行,而是将该行取出来,重写后产生新的版本进行存储。读取的时候,多版本会进行合并后返回给用户。

image.png

###2) Jindo-FS
EMR早期推出了一种本地盘机型,使用这种机型来部署集群类似于用本地集群在云下部署大数据发行版,价格较高;此外由于当时HDFS有元数据瓶颈,本地存储的动态化伸缩面临很大的挑战。针对这方面的问题,解决的方案是计算与存储分离,将数据存储在OSS上,但是这种分离带来的直接结果就是性能变差,因为OSS元数据操作耗时,读取数据跨网络,传输带宽也会严重影响性能。

image.png

进而的解决方案是将数据从远端拉取到计算侧进行缓存,这也是Jindo-FS做的事情。Jindo-FS是类似于HDFS的系统,其架构也类似于HDFS的Master-Slave架构,分成Name Service 和Storage Service。它支持将某些访问频率比较高的表可以放到RocksDB中进行多级缓存。Jindo-FS整体不同于HDFS的Master结点, Jindo-FS的“Master”(Name Service)是一个分布式集群,使用raft 协议,提供入口服务;提供多Name Space支持;元数据以kv形式存放于高性能kv store 中;因为其本身不存储数据,真实数据在OSS和OTS中,因此支持数据的弹性扩展和销毁重建。

image.png

Jindo-FS底层的元数据管理会将数据拆成一系列的kv,通过递增的id来逐层查询。如/home/Hadoop/file1.txt需要读三次OTS。下图右侧的测试结果说明Jindo-FS在元数据操作方面相对于OSS有较好的性能提升。

image.png

Jindo-FS使用Storage Service来进行底层存储,在写流程中Storage Service将要写的文件同时存储到本地和OSS中,然后再返回给用户写的结果,同时还会在集群结点内进行多副本传输;而读操作和HDFS类似,如果命中本地,则在本地存储中读取,否则要进行远程读取。Storage Service具备高性能、高可靠、高可用、弹性存储等特性,为了支持高性能,Jindo-FS建立了数据流高速通道,同时还有一系列的策略,如减少内存拷贝次数等。

image.png

Jindo-FS中Name Service如何实现高可靠、如何进行热点数据发现与缓存替换、块存储模式与缓存模式;以及Storage Service如何应对读写失败、数据块如何设计并存储、如何实现高速数据通道等问题,请参见大数据生态专场《云上大数据的高效能数据库的存储方案》的分享。

image.png

相关文章:
JindoFS概述:云原生的大数据计算存储分离方案

JindoFS解析 - 云上大数据高性能数据湖存储方案


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!二维码.JPG

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
4月前
|
运维 Cloud Native 安全
云原生技术:重塑现代IT架构的引擎
在当今数字化时代,企业正面临着前所未有的挑战与机遇。随着云计算技术的不断发展,云原生技术作为其核心驱动力之一,正在彻底改变企业的IT架构和运营模式。本文将深入探讨云原生技术的内涵、特点及其对企业数字化转型的影响,揭示其在现代IT架构中的核心地位和作用。同时,我们还将分析云原生技术面临的安全挑战,并展望未来的发展趋势,为企业在云原生领域的实践提供有益的参考。
|
3天前
|
人工智能 分布式计算 Cloud Native
云原生数据仓库AnalyticDB:深度智能化的数据分析洞察
云原生数据仓库AnalyticDB(ADB)是一款深度智能化的数据分析工具,支持大规模数据处理与实时分析。其架构演进包括存算分离、弹性伸缩及性能优化,提供zero-ETL和APS等数据融合功能。ADB通过多层隔离保障负载安全,托管Spark性能提升7倍,并引入AI预测能力。案例中,易点天下借助ADB优化广告营销业务,实现了30%的任务耗时降低和20%的成本节省,展示了云原生数据库对出海企业的数字化赋能。
|
1月前
|
运维 监控 安全
公司监控软件:SAS 数据分析引擎驱动网络异常精准检测
在数字化商业环境中,企业网络系统面临复杂威胁。SAS 数据分析引擎凭借高效处理能力,成为网络异常检测的关键技术。通过统计分析、时间序列分析等方法,SAS 帮助企业及时发现并处理异常流量,确保网络安全和业务连续性。
55 11
|
2月前
|
Kubernetes Cloud Native 调度
云原生批量任务编排引擎Argo Workflows发布3.6,一文解析关键新特性
Argo Workflows是CNCF毕业项目,最受欢迎的云原生工作流引擎,专为Kubernetes上编排批量任务而设计,本文主要对最新发布的Argo Workflows 3.6版本的关键新特性做一个深入的解析。
|
4月前
|
消息中间件 存储 Cloud Native
基于 RocketMQ 的云原生 MQTT 消息引擎设计
本文将介绍阿里云如何将 Serverless 架构应用于消息队列,有效降低运营成本,同时利用云原生环境的特性,为 IoT 设备提供快速响应和灵活伸缩的通讯能力。
263 30
|
3月前
|
运维 Cloud Native 安全
云原生架构:企业数字化转型的新引擎##
【10月更文挑战第2天】 在当今数字化浪潮中,云原生架构以其独特的优势成为企业实现高效、灵活和创新的核心驱动力。本文深入探讨了云原生的概念、核心技术如容器化、微服务和DevOps等,并分析了这些技术如何共同作用,推动企业在云平台上实现快速迭代、弹性扩展和资源优化。同时,文章还阐述了云原生在实际应用中面临的挑战及相应的解决策略,为企业的数字化转型提供全面而深入的指导。 ##
67 17
|
3月前
|
运维 Cloud Native 持续交付
探索云原生架构:企业数字化转型的新引擎
在当今数字化浪潮中,云原生架构以其独特的优势成为企业转型的关键。它通过容器化、微服务、DevOps和持续交付等技术,使企业能够快速响应市场变化,实现应用的高效开发、部署和运维。本文将深入探讨云原生的概念、核心技术及其在现代IT环境中的重要性。
|
3月前
|
Kubernetes 监控 Cloud Native
探索云原生架构:企业数字化转型的新引擎
【10月更文挑战第5天】 在当今数字化浪潮中,云原生架构以其独特的优势成为企业实现高效、灵活和可扩展的关键。本文将深入探讨云原生的核心概念、关键技术以及实际应用案例,揭示其在推动企业数字化转型中的重要作用。
50 6
|
3月前
|
运维 Kubernetes Cloud Native
探索云原生架构:企业数字化转型的新引擎
【10月更文挑战第9天】 在当今数字化浪潮中,云原生架构以其独特的优势成为企业实现高效运营和快速创新的关键。本文将深入探讨云原生的核心概念、关键技术以及实际应用案例,揭示其如何助力企业加速数字化转型步伐。通过对云原生技术的剖析,我们将看到这一新兴架构是如何重新定义软件开发、部署和运维模式的,进而推动企业在激烈的市场竞争中脱颖而出。
|
3月前
|
运维 Cloud Native 持续交付
云原生技术:企业数字化转型的新引擎##
随着数字化浪潮的不断推进,企业对于高效、灵活的IT基础设施需求日益增长。云原生技术作为一种新型的软件开发和部署范式,正逐渐成为企业数字化转型的重要驱动力。本文将深入探讨云原生技术的核心概念、关键技术以及其在企业数字化转型中的作用和优势,旨在为读者提供清晰、全面的认识。 ##