Virgin Hyperloop One如何使用Koalas将处理时间从几小时降到几分钟--无缝的将pandas切换成Apache Spark指南

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Virgin Hyperloop One(超级高铁公司)是一家从事超级高铁研究的公司,致力于能让高铁达到飞机的速度并且拥有更低的成本。为了能够制造一个商业的系统,我们需要收集并且分析非常大量的各种不同的数据,包括各种运行测试数据,多种模拟数据,技术设施数据,甚至社会经济数据等等。

Virgin Hyperloop One(超级高铁公司)是一家从事超级高铁研究的公司,致力于能让高铁达到飞机的速度并且拥有更低的成本。为了能够制造一个商业的系统,我们需要收集并且分析非常大量的各种不同的数据,包括各种运行测试数据,多种模拟数据,技术设施数据,甚至社会经济数据等等。我们之前绝大部分处理数据的代码都是基于pandas使用python脚本来进行处理。之所以写这篇文档是因为我们想分享我们如何使用Koalas在很少修改代码的情况下来扩展我们的处理能力和节省大量处理时间的。

随着我们的业务不断的增长我们的数据量也在不断的增长。我们的数据处理范围越来越大,复杂程度越来越高,这导致我们基于pandas的python脚本越来越慢,知道慢到不能满足我们的商业需求。所以我们调研了Spark,希望使用Spark能够带来更快的处理时间并且能够提供按需灵活弹性的能力。我们尝试这样做了,但是很快我们发现切换到Spark的过程中,我们必须付出非常多的时间把我们之前的基于pandas的python代码修改成基于PySpark的代码。我们意识到我们需要一套能够不需要修改过多代码就能迁移到Spark上的解决方案。我们非常高兴的发现了这个解决方案:Databricks最近开源的Koalas。

Kolas的Readme中是这样写的:

Koalas项目基于Apache Spark实现了pandas DataFrame API,从而使数据科学家能够更有效率的处理大数据。

(...)

如果你已经熟悉pandas,那么你不需要付出任何学习成本就能使用Spark

一份代码可以同时在pandas(用于测试,小数据集)和Spark(用于分布式datasets)两个平台上运行。

本文我讲介绍Koalas为什么值得你去尝试。只需要修改不到1%的pandas代码,我们就能将我们以前的代码跑在Koalas和Spark上。我们将处理时间提升了10倍,从几小时下降到了几分钟。并且我们具备了水平扩展的能力,这使我们能够处理更多的数据。

快速开始

在安装Koalas之前,首先我们需要一个能够运行PySpark的Spark集群。然后我们执行以下命令:

pip install koalas

如果使用conda,则执行以下命令:

conda install koalas -c conda-forge

更详细的信息可以查看Koalas的Readme文档。

安装完之后我们执行一个快速测试:

import databricks.koalas as ks
kdf = ks.DataFrame({'column1':[4.0, 8.0]}, {'column2':[1.0, 2.0]})
kdf

image

可以看到Koalas渲染出来了pandas风格的interactive tables,这太便利了。

基本操作示例:

首先我们生成了一个含有4列多行的测试数据:

import pandas as pd
## generate 1M rows of test data
pdf = generate_pd_test_data( 1e6 )
pdf.head(3)
>>>     timestamp pod_id trip_id speed_mph
0 7.522523 pod_13 trip_6 79.340006
1 22.029855 pod_5 trip_22 65.202122
2 21.473178 pod_20 trip_10 669.901507

我们先描述下我们要处理的业务,比如:每个pod-trip的时间是多少。

这需要进行以下操作:

  1. Group by ['pod_id','trip id']
  2. 对于每个trip, 计算trip_time,计算逻辑为:last timestamp – first timestamp
  3. 计算pod-trip时间分布 (mean, stddev)

Pandas方式(代码短,速度慢):

(snippet #1)

import pandas as pd
# take the grouped.max (last timestamp) and join with grouped.min (first timestamp)
gdf = pdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
# calculate the statistics on trip times
pd_result = gdf.describe()

使用PySpark方式(代码长,速度快):

(snippet #2)

import pyspark as spark
# import pandas df to spark (this line is not used for profiling)
sdf = spark.createDataFrame(pdf)
# sort by timestamp and groupby
sdf = sdf.sort(desc('timestamp'))
sdf = sdf.groupBy("pod_id", "trip_id").agg(F.max('timestamp').alias('timestamp_last'), F.min('timestamp').alias('timestamp_first'))
# add another column trip_time_sec as the difference between first and last
sdf = sdf.withColumn('trip_time_sec', sdf2['timestamp_last'] - sdf2['timestamp_first'])
sdf = sdf.withColumn('trip_time_hours', sdf3['trip_time_sec'] / 3600.0)
# calculate the statistics on trip times
sdf4.select(F.col('timestamp_last'),F.col('timestamp_first'),F.col('trip_time_sec'),F.col('trip_time_hours')).summary().toPandas()

使用Koalas方式(代码短,速度快):

(snippet #3)

import databricks.koalas as ks
# import pandas df to koalas (and so also spark) (this line is not used for profiling)
kdf = ks.from_pandas(pdf)
# the code below is the same as the pandas version
gdf = kdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
ks_result = gdf.describe().to_pandas()

注意第一段和第三段中的代码基本上一致,可以无缝迁移到Spark上面。对于大多数pandas代码,你只需要将import pandas改为databricks.koalas as pd,对于某些脚本需要微调还有些限制我们下面将会提到。

结果:
All the snippets have been verified to return the same pod-trip-times results. The describe and summary methods for pandas and Spark are slightly different, as explained here but this should not affect performance too much.
上面三段代码经过验证都返回了一致的结果。pandas和spark的describesummary方法不太一样(https://www.kdnuggets.com/2016/01/python-data-science-pandas-spark-dataframe-differences.html)但是这不影响太多性能。

示例的结果:
image

进阶示例:UDFs和复杂操作

接下来我们用相同的的datafrme来尝试解决负责的计算,从而对比出pandas和Koalas的实现上的不同。

目标:分析每个pod-trip的平均速度:

  1. Group by ['pod_id','trip id']
  2. 通过查找下方的速度(时间)表(计算方法连接)计算出每个pod-trip的总距离。
  3. 对group过的df使用timestamp列排序。
  4. 计算timestamps的差值。
  5. 将速度和时间戳的差值相乘。-- 这样可以得到在一段时间之内的行驶距离。
  6. 对于distance_travelled进行sum计算 – 这样算出每个pod-trip的距离。
  7. 计算 trip time as timestamp.last – timestamp.first
  8. 计算 average_speed as distance_travelled / trip time
  9. 计算pod-trip时间的分布 (mean, stddev)。

我们使用custom apply function和UDF (user defined functions)来实现上面的计算过程。

使用pandas的方式:

(snippet #4)

import pandas as pd
def calc_distance_from_speed( gdf ):
 gdf = gdf.sort_values('timestamp')
 gdf['time_diff'] = gdf['timestamp'].diff()
 return pd.DataFrame({
  'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
  'travel_time_sec': [ gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0] ]
})
results = df.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed)
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

PySpark的方式:

(snippet #5)

import databricks.koalas as ks
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pyspark.sql.functions as F
schema = StructType([
 StructField("pod_id", StringType()),
 StructField("trip_id", StringType()),
 StructField("distance_miles", DoubleType()),
 StructField("travel_time_sec", DoubleType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calculate_distance_from_speed( gdf ):
 gdf = gdf.sort_values('timestamp')
 print(gdf)
 gdf['time_diff'] = gdf['timestamp'].diff()
 return pd.DataFrame({
  'pod_id':[gdf['pod_id'].iloc[0]],
  'trip_id':[gdf['trip_id'].iloc[0]],
  'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
  'travel_time_sec': [ gdf['timestamp'].iloc[-1]-gdf['timestamp'].iloc[0] ]
})
sdf = spark_df.groupby("pod_id","trip_id").apply(calculate_distance_from_speed)
sdf = sdf.withColumn('distance_km',F.col('distance_miles') * 1.609)
sdf = sdf.withColumn('avg_speed_mph',F.col('distance_miles')/ F.col('travel_time_sec') / 60.0)
sdf = sdf.withColumn('avg_speed_kph',F.col('avg_speed_mph') * 1.609)
sdf = sdf.orderBy(sdf.pod_id,sdf.trip_id)
sdf.summary().toPandas() # summary calculates almost the same results as describe

使用Koalas的方式:

(snippet #6)

import databricks.koalas as ks
def calc_distance_from_speed_ks( gdf ) -> ks.DataFrame[ str, str, float , float]:
 gdf = gdf.sort_values('timestamp')
 gdf['meanspeed'] = (gdf['timestamp'].diff()*gdf['speed_mph']).sum()
 gdf['triptime'] = (gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0])
 return gdf[['pod_id','trip_id','meanspeed','triptime']].iloc[0:1]

kdf = ks.from_pandas(df)
results = kdf.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed_ks)
# due to current limitations of the package, groupby.apply() returns c0 .. c3 column names
results.columns = ['pod_id', 'trip_id', 'distance_miles', 'travel_time_sec']
# spark groupby does not set the groupby cols as index and does not sort them
results = results.set_index(['pod_id','trip_id']).sort_index()
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

Koalas是基于PySpark的pandas_udf的来实现的,这就是为什么在定义function的时候需要定义类型提示。这个包的作者介绍了新的类型提示,ks.DataFrameks.Series。但是当前的实现有些笨重并且需要进行相应的修改才能达到相同的数据结果(改变列名,groupby key不返回)。不过上面这些行为都在文档中进行了适当的说明。

性能:

为了评估Koalas的性能,我们对不同的数据量进行了测试。

这个性能测试在Databricks的平台上进行,使用了如下的配置:

  • 1个Spark driver节点 (也用来跑pandas脚本): 8 CPU cores, 61GB RAM.
  • 15个Spark worker 节点: 4CPU cores, 30.5GB RAM (一共: 60CPUs / 457.5GB内存 ).

每次测试重复10次。

基础操作

当数据很少的时候,初始化的时间和数据传输的时间相对于计算时间要长的所。所以pandas速度快一点(标记a)。当有大量数据的时候,pandas的数据处理时间要多于使用Spark的方案(标记b),我们可以观察到Koalas比PySpark有一些性能损耗,但是随着数据量的增长这个差距在减小(标记c)。
image

UDFs

image
在UDF的评测中,PySpark和Koalas文档当中描述了使用UDF会带来性能的大幅下降,所以我们将测试数据的行数减少了100倍。在每个测试用例中,Koalas和PySpark具有相同的性能,这暗示这他们使用了相同的底层实现。在实验过程中,我们发现使用PySpark windows功能性能会得到提升,但是Koalas没有此实现,所以我们仅测试UDF。

讨论

如果你想使你的pandas代码能够具有处理更大数据量和水平扩展的能力,那么Koalas将是一个正确的选择。当快速切换到Koalas上后你可以通过调整你的Spark集群的规模来处理更大的数据量并且显而易见的缩短处理时间。而且性能和PySpark差不多(根据集群规模和数据量的规模,有5%到50%的损失)。

另一方面,Koalas API相对于原生的Spark有一些损耗。如果你对性能非常敏感,你可以考虑使用Scala来实现你的处理逻辑。

限制和不同

当你刚开始接触Koalas的时候,你需要知道“为什么这个没有实现?!” 当前Koalas的包仍然在开发过程中,并且有些pandas API没有实现,但是大多数目前没有的功能会在接下来的几个月中实现(比如说groupby.diff()或者kdf.rename()

根据我作为这个项目的contributor的经验,有些功能使用Spark API实现起来过于复杂或者有较大性能损耗。比如:DataFrame.values需要在单一节点的内存中处理整个数据集,所以这是不太合适甚至是不太可能的。取而代之如果你需要在driver中检索结果,你可以调用DataFrame.to_pandas()或者DataFrame.to_numpy()

另外一个重要的事情是Koalas的执行链和pandas是不同的:当在dataframe上面执行一些操作,会将操作放入一个队列当中,只有需要results的时候才会真正执行完毕。比如:当调用kdf.head()或者kdf.to_pandas()操作的时候才会被真正执行。这可能会对一些没有使用过Spark的人造成困扰,因为pandas是按照一行一行来具体执行的。

结论

Koalas帮助我们快速的讲pandas代码迁移到Spark上。如果你在尝试扩展你的pandas代码,你可以尝试使用Koalas。如果你遇到某些问题或者需要更多的功能,你可以提issue给社区,我们将保障Koalas的活跃度并不断的改进。另外的,欢迎来一起贡献代码。

资源

  1. Koalas github: https://github.com/databricks/koalas
  2. Koalas documentation: https://koalas.readthedocs.io
  3. 文章中的代码: https://gist.github.com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc .
  4. 原文链接:https://databricks.com/blog/2019/08/22/guest-blog-how-virgin-hyperloop-one-reduced-processing-time-from-hours-to-minutes-with-koalas.html
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
56 1
|
5月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
163 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
4月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
72 0
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
226 0
|
5月前
|
分布式计算 Apache Spark
|
6月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
187 6
|
9天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
275 33
The Past, Present and Future of Apache Flink
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
824 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
89 3
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多
下一篇
DataWorks