本场视频链接:New Developments in the Open Source Ecosystem: Apache Spark 3.0 and Koalas
Spark 发展
Spark的前身是加州伯克利分校的一名博士生于开发的一套大数据分布式处理系统,从开发、开源发展至今已经整整十年。因此,2019年对于Spark社区来说是一个很特殊的年份。
Spark的发展相当迅速,2013年其原创团队建立了Databricks,将Spark捐献给了Apache Foundation;2014年Spark成为了Apache顶级项目;2016年发布了Spark 2.0;2019年10月Spark 3.0 preview版本发布,收集用户反馈后将于2020年3月份发布Spark 3.0正式版本。
从世界著名的开发者论坛Stack Overflow的数据可以看出,2015年开始Spark每月的问题提交数量已经超越Hadoop,而2018年Spark Python版本的API PySpark每月的问题提交数量也已超过Hadoop。当前排名Spark第一,PySpark第二;而十年的累计排名是Spark第一,PySpark第三。按照这个趋势发展下去,Spark和PySpark在未来很长一段时间内应该还会处于垄断地位。
Stack Overflow于2019年发布的开发者调查报告结果显示,在开发者最受欢迎的平台中,Spark位居前列。
Spark 3.0新特性
Spark 3.0新的特性如下图所示,其中Adaptive Query Execution(AQE)在Spark 1.0版本中已经实现,但是Spark的大数据团队通过和百度、eBay等公司合作尝试解决了一系列的缺陷和不足,3.0版本中重新实现了一套新的AQE的框架,让其更好地支持实时动态优化;Spark Graph是一个新的模块,是Spark团队和世界知名的图计算公司Neo4j合作,将GraphFrames和GraphX相关的功能引入Spark Graph,推出的用于解决大数据方面的图运算功能; Accelerator-aware Scheduling是Spark和NVIDIA公司合作,推出的让Spark可以更加高效地融入显卡的加速器,来实现某种具体应用场景下的加速,与此同时,NVIDIA和Facebook都尝试在Spark中增加向量计算;Spark on Kubernetes是Google工程师主导的一个项目,他们希望实现Spark在Google的云原生运行,这个版本中该模块相对稳定;Spark作为计算引擎,没有存储,需要一套更加高效简洁的API和外部系统进行融合,在这个版本中阿里巴巴和Netflix与Spark合作,共同开发了Data Source API with Catalog Supports;ANSI SQL Compliance模块是Spark和日本电信NTT合作研发的,尤其是对于Parser部分SQL标准部分,NTT给予了很大的支持;SQL Hints是Database中比较普遍的功能;苹果公司算是Spark生态圈中最大的用户,其在Spark 3.0版本中主导了JDK11的开发支持;Hadoop 3的增加和支持主要是eBay,其用Spark来作为数据仓库。
Spark 2.X中增加了基于计算的Cost模型,但是在Spark应用环境中其表现并不如意,主要原因是:
• 一次性计算:Spark 2.X基本的应用场景是ETL,一次性计算,收集数据成本昂贵,统计信息的缺失导致基于Cost的优化基本不可能完成;
• 存储于计算分离: Spark最初设计的原则就是不拥有数据,这意味着用户可以用不同的方式增加、删除和修改,而如果统计信息错误,就无法保证基于cost的优化的正确性,甚至优化后的结果可能更差;
• 多样部署环境:基于Cost的优化依赖Cost模型,而Cost模型在Spark的运行环境是多样的,如云原生、分布式、大型机等,因此无法提出一套普遍适用于多种生态环境的Cost模型。此外,Spark中比较重要的功能UDFs支持用户根据自己需要任意加载复杂或简单的UDF实现其业务逻辑。这种情况下是完全无法计算其Cost的,相应地,优化器也就无法实现一个正确有效的优化计划。
针对上述问题,Spark 3.0版本在cost基础上收集应用在运行期间的统计信息,来帮助动态优化运行的计划。
下面主要介绍两个相关的特性:
1) Dynamic Partition Pruning
下图中展示了一个非常典型的TPCS查询,涉及了两个表,其中表1比较大,表2无关大小但是其Filter可能会减去绝大多数值,因此在join操作后实际产生的结果不会很多。
也就是说join操作的condition在执行后,会排除掉大量的行,因此在读表1的时候会浪费很多读的资源。但是基于行或cost的优化是无法动态知道表2经过筛选后能产生多少数值,因此不得不全部读取。如果考虑运行期间的统计信息,表2进行筛选后的结果就可以推送到右边子树,也就是表1。
下图是一个简单的测试,可以发现如果减少90%的scan,就可以达到33倍的加速。这种情况在Spark的TPCS大量存在,因此通过Dynamic Partition Pruning可以实现TPCS至少两倍的性能加速。
2) Adaptive Query Execution (AQE)
下图是一个经典的Spark的流程,从Parser、Analyzer、Optimizer、Planner到Query的执行。该版本中,AQE指的是图中的红线部分。当某种condition满足的情况下可以进行动态自适应规划。
下面举一个简单的例子,执行的是两个表之间的join查询。包含一个key和Filter,如t2.col2 LIKE '9999%'。在基于cost的模型中是不可能准确的知道Filter能排除多少行的,这种情况下Spark通过谓词下推,将各个条件先应用到对应的数据上,而不是根据写入的顺序执行,就可以先过滤掉部分数据,降低join等一系列操作的数据量级。
在没有动态实时运行信息的时候,保守估计判断只能用SortMergeJoin。当收集到运行时信息后会发现某个Filter事实上已经去掉了表中绝大多数行,完全可以采用BroadcastHashJoin,如果上层parent也有这种情况,就可以大大提升查询效率。
Spark生态
Spark设计之初的目标就是实现一套统一的系统,来解决与大数据相关的各式问题。就像是苹果公司在最初推出苹果手机的时候,其功能丰富性的目标引来了很多人的质疑,但是后来经过发展印证了其目标的商业价值,很多相机公司、游戏公司因为苹果手机的竞争而最终销声匿迹。我们相信Spark未来也可以在大数据领域实现能够统一解决各种问题的目标,但是目前Spark在流处理方面做得还不够好。
人工智能和大数据处理在近几年发展迅速,从最开始的数据仓库对已有数据进行响应式查询,来获得已有数据的统计信息,到现在基于已有数据进行的预测和分析,再到之后会用到人工智能和机器学习技术来辅助决策。Spark作为一个统一的大数据处理、分析引擎,将发挥很重要的作用。
Spark主要针对两个群体,即数据工程师和数据科学家。
Spark通过提供一套统一的引擎,来打破数据工程和数据科学之间的障碍,使其更加容易地进行交流和合作。
1) 数据科学
相信大家已经注意到了,在过去几年中备受争议的Python发展相当迅速,在Stack Overflow上的问题提交数量已经远远超过其他语言。在Python社区中,Pandas是绝大多数数据科学家选择的一个数据分析的library,但是Pandas只适用于单机小量级数据的分析,数据量级大的时候,性能会变差。因此当面对大数据的处理分析时,使用Pandas的数据科学家不得不重新学习大数据分析的语言和工具。
Spark在今年四月份发布了Koalas,它面向的是Pandas已有用户,可以让其无缝使用Spark来作为数据分析和计算引擎。Koalas自推出便迎来了很多用户,并且广受好评。从下图左侧的日下载量数据可以看出,目前为止日下载量已经超过一万,月下载量超过20万,这印证了社区对这个新的library的认同。与此同时,PySpark同样作为数据科学家可以使用的引擎,日下载量从最初的平均三万快速增长到目前的最高九万,月下载量目前已经达到两百万。
下图展示了Pandas DataFrame和PySpark DataFrame的区别,其底层的设计理念完全不同。Pandas DataFrame假设数据结构可以变化,PySpark DataFrame则相反。PySpark DataFrame API的设计完全是基于SQL实现的,而Pandas DataFrame则是与SQL无关的全新实现的一套语义和标准。举一个经典的例子Value count,Pandas DataFrame只有一个API便可以解决所有问题,同时Vaule Count在Pandas中支持自动Sort;但是如果Pandas用户使用PySpark,就需要SQL的语义,首先从一个已有的Frame GroupBy,然后再进行Count和OrderBy操作,这明显增加了Pandas用户的学习成本。
再举一个简单的例子来进一步说明Pandas用户使用PySpark的复杂性。Pandas读取一个读取csv文件后,设置新的列名称,同时生成一个新的列,具体操作如下图左侧所示。而使用PySpark则完全不同,首先需要声明各种相关option,然后通过toDF进行重新命名,还需要通过withColumn生成新的列。
而上述问题在Koalas中可以得到很好的解决,Pandas用户只需要导入Koalas的library,就可以在Spark中无缝使用Pandas。事实证明,Koalas是对于数据科学家使用Spark过程中一个重要的library。
2) 数据工程
Delta是基于Spark上千用户在使用过程中的痛点问题而设计开发的一款产品。它是构建data lake的开放标准,基于已有的Parquet,支持transaction来保证数据的质量,同时完全兼容Spark的API。Delta于今年四月份宣布开源,令很多用户激动不已
下图中的例子展示了如何从Parquet转换到Delta。用户只需要在代码中用delta替换Parquet,仅此而已。
Delta主要包括四个核心的特性,其中最重要的是ACID transactions,它通过乐观并发控制来实现数据的同时读、写,从而保证数据的一致性,提高了数据的质量,让批处理和流处理可以使用一套统一的数据,支持update、delete和merge等典型的数据仓库操作。还有一点不得不提的是,Spark中元数据处理已经从小数据变成了大数据,当一个表有成百上千万的partition的时候,元数据获取和处理将会耗费大量的时间,有时甚至多大几小时,而使用Delta使用Spark做元数据处理,可以大大提高效率。
下面分享三个使用Delta的社区用户场景:
• Comcast:它是一个媒体内容提供商,它使用Spark来作为数据处理引擎已经很长时间了,因为其数据量级比较大,可以达到PB级别。在用Delta代替Parquet后,由原来一条pipeline需要的640个服务器降低到64个,原来需要84个job完成的数据处理降低到现在的3个,数据延迟加速了一倍。
• Sam’s:它是沃尔玛的一个子品牌,也是Spark的重度用户。其原来没有transaction的update,因此经常会由于数据的质量问题导致pipeline停止。自从使用了Delta,transaction的问题得到了完美解决。此外,Delta提供jobs自动merge的API,帮助该用户merge部分的操作时间由原来的超过一个小时缩减到现在的小于6秒。
• Healthdirect Australia:该用户原来的数据一致性无法保证,因为Delta的使用,数据质量大幅度提升,名称匹配的准确性由原来的80%提升到了95%,数据加载速度由原来的一天缩减到二十分钟。
在Databricks使用Delta每天在客户生成环境中处理的数据量达到38PB以上,某一大型客户的生产环境每天新入库的数据量高达400TB以上。
Spark社区不断有新的项目和特性被大量地使用,本人对Spark做过贡献的个人和公司表示由衷的感谢。也希望中国的用户和社区开发人员积极加入进来,为Spark做更多的贡献。
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!