并行计算框架Polars、Dask的数据处理性能对比

简介: 在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序

将最终的结果保存到新的文件

脚本

1、Polars

数据加载读取

 def extraction():
     """
     Extract two datasets from parquet files
     """
     path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips= pl_read_parquet(path1,)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = pl_read_parquet(path2,)

     return df_trips, df_zone

 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df

转换函数

 def transformation(df_trips, df_zone):
     """
     Proceed to several transformations
     """
     df_trips= mean_test_speed_pl(df_trips, )

     df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
     df = df.select(["Borough","Zone","trip_distance",])

     df = get_Queens_test_speed_pd(df)
     df = round_column(df, "trip_distance",2)
     df = rename_column(df, "trip_distance","mean_trip_distance")

     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df


 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl

 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """

     df_pl = df_pl.filter(pl.col("Borough")=='Queens')

     return df_pl

 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df

 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df

 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df

保存

 def loading_into_parquet(df_pl):
     """
     Save dataframe in parquet
     """
     df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

 import polars as pl
 import time

 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df

 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl

 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """

     df_pl = df_pl.filter(pl.col("Borough")=='Queens')

     return df_pl

 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df

 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df


 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df


 def main():

     print(f'Starting ETL for Polars')
     start_time = time.perf_counter()

     print('Extracting...')
     df_trips, df_zone =extraction()

     end_extract=time.perf_counter() 
     time_extract =end_extract- start_time

     print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
     print('Transforming...')
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter() 
     time_transformation =time.perf_counter() - end_extract
     print(f'Transformation end in {round(time_transformation,5)} seconds')
     print('Loading...')
     loading_into_parquet(df,)
     load_transformation =time.perf_counter() - end_transform
     print(f'Loading end in {round(load_transformation,5)} seconds')
     print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")


 if __name__ == "__main__":

     main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

 import dask.dataframe as dd
 from dask.distributed import Client
 import time

 def extraction():
     path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips = dd.read_parquet(path1)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = dd.read_parquet(path2)

     return df_trips, df_zone

 def transformation(df_trips, df_zone):
     df_trips = mean_test_speed_dask(df_trips)
     df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
     df = df[["Borough", "Zone", "trip_distance"]]

     df = get_Queens_test_speed_dask(df)
     df = round_column(df, "trip_distance", 2)
     df = rename_column(df, "trip_distance", "mean_trip_distance")

     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df

 def loading_into_parquet(df_dask):
     df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")

 def mean_test_speed_dask(df_dask):
     df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
     return df_dask

 def get_Queens_test_speed_dask(df_dask):
     df_dask = df_dask[df_dask["Borough"] == "Queens"]
     return df_dask

 def round_column(df, column, to_round):
     df[column] = df[column].round(to_round)
     return df

 def rename_column(df, column_old, column_new):
     df = df.rename(columns={column_old: column_new})
     return df

 def sort_by_columns_desc(df, column):
     df = df.sort_values(column, ascending=False)
     return df



 def main():
     print("Starting ETL for Dask")
     start_time = time.perf_counter()

     client = Client()  # Start Dask Client

     df_trips, df_zone = extraction()

     end_extract = time.perf_counter()
     time_extract = end_extract - start_time

     print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
     print("Transforming...")
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter()
     time_transformation = time.perf_counter() - end_extract
     print(f"Transformation end in {round(time_transformation, 5)} seconds")
     print("Loading...")
     loading_into_parquet(df)
     load_transformation = time.perf_counter() - end_transform
     print(f"Loading end in {round(load_transformation, 5)} seconds")
     print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")

     client.close()  # Close Dask Client

 if __name__ == "__main__":
     main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。

https://avoid.overfit.cn/post/74128cd8803b43f2a51ca4ff4fed4a95

作者:Luís Oliveira

目录
相关文章
|
4月前
|
存储 并行计算 数据处理
使用GPU 加速 Polars:高效解决大规模数据问题
Polars 最新开发了 GPU 加速执行引擎,支持对超过 100GB 的数据进行交互式操作。本文详细介绍了 Polars 中 DataFrame(DF)的概念及其操作,包括筛选、数学运算和聚合函数等。Polars 提供了“急切”和“惰性”两种执行模式,后者通过延迟计算实现性能优化。启用 GPU 加速后,只需指定 GPU 作为执行引擎即可大幅提升处理速度。实验表明,GPU 加速比 CPU 上的懒惰执行快 74.78%,比急切执行快 77.38%。Polars 的查询优化器智能管理 CPU 和 GPU 之间的数据传输,简化了 GPU 数据处理。这一技术为大规模数据集处理带来了显著的性能提升。
138 4
|
5月前
|
分布式计算 并行计算 数据处理
大规模数据处理的最佳实践:使用 Dask 进行高效并行计算
【8月更文第29天】在大数据时代,高效地处理大规模数据集是至关重要的。Python 社区提供了一些强大的工具来帮助开发者进行并行和分布式计算,其中之一就是 Dask。本文将详细介绍如何使用 Dask 来优化大规模数据集的处理效率,并提供一些实用的代码示例。
800 3
|
5月前
|
并行计算 算法 大数据
Dask 与图形处理:大规模图数据的并行分析
【8月更文第29天】在大数据时代,图数据结构因其能够高效表达实体之间的复杂关系而变得越来越重要。然而,处理大规模图数据集往往需要高效的并行计算框架。Dask 是一个灵活的并行计算库,它能够与 Python 的现有科学计算生态系统无缝集成。本文将介绍如何利用 Dask 来处理和分析大规模的图数据结构。
216 4
|
5月前
|
分布式计算 并行计算 大数据
NumPy 并行计算与分布式部署
【8月更文第30天】随着数据量的不断增长,传统的单机计算模型已经难以满足对大规模数据集处理的需求。并行和分布式计算成为了处理这些大数据集的关键技术。虽然 NumPy 本身并不直接支持并行计算,但可以通过结合其他库如 Numba 和 Dask 来实现高效的并行和分布式计算。
49 1
|
6月前
|
SQL 并行计算 API
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
|
5月前
|
存储 分布式计算 Hadoop
分布式计算框架在大规模数据处理中的应用
【8月更文第18天】随着大数据时代的到来,对海量数据进行有效的存储、处理和分析变得越来越重要。传统的单机系统已经无法满足PB级别数据集的需求。分布式计算框架,如Apache Hadoop和Apache Spark,成为了处理这些大规模数据集的重要工具。
406 0
|
6月前
|
机器学习/深度学习 数据采集 数据处理
重构数据处理流程:Pandas与NumPy高级特性在机器学习前的优化
【7月更文挑战第14天】在数据科学中,Pandas和NumPy是数据处理的关键,用于清洗、转换和计算。用`pip install pandas numpy`安装后,Pandas的`read_csv`读取数据,`fillna`处理缺失值,`drop`删除列。Pandas的`apply`、`groupby`和`merge`执行复杂转换。NumPy加速数值计算,如`square`进行向量化操作,`dot`做矩阵乘法。结合两者优化数据预处理,提升模型训练效率和效果。
77 1
|
5月前
|
并行计算 大数据 Java
高效数据处理:使用Python实现并行计算的技巧
传统的数据处理方式在面对大数据时可能效率不高,本文探讨如何利用Python中的并行计算技术来提升数据处理速度和效率,重点介绍了多线程和多进程的应用,以及如何选择合适的场景使用这些技术。
|
7月前
|
数据采集 PyTorch 数据处理
PyTorch的数据处理
PyTorch中,`Dataset`封装自定义数据集,`DataLoader`负责批量加载和多线程读取。例如,定义一个简单的`Dataset`类,包含数据和标签,然后使用`DataLoader`指定批大小和工作线程数。数据预处理包括导入如Excel的数据,图像数据集可通过`torchvision.datasets`加载。示例展示了如何从Excel文件创建`Dataset`,并用`DataLoader`读取。
|
7月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
103 0