大数据开发!Pandas转spark无痛指南!⛵

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Pandas灵活强大,是数据分析必备工具库!但处理大型数据集时,需过渡到PySpark才可以发挥并行计算的优势。本文总结了Pandas与PySpark的核心功能代码段,掌握即可丝滑切换。
55e532b24ae492ce80eb9eed1999e7e3.png
💡 作者: 韩信子@ ShowMeAI
📘 大数据技术◉技能提升系列https://www.showmeai.tech/tutorials/84
📘 数据分析实战系列https://www.showmeai.tech/tutorials/40
📘 本文地址https://www.showmeai.tech/article-detail/338
📢 声明:版权所有,转载请联系平台与作者并注明出处
📢 收藏 ShowMeAI查看更多精彩内容
97114f0b44f1ce7d0206ace5f5934df0.png

Pandas 是每位数据科学家和 Python 数据分析师都熟悉的工具库,它灵活且强大具备丰富的功能,但在处理大型数据集时,它是非常受限的。

这种情况下,我们会过渡到 PySpark,结合 Spark 生态强大的大数据处理能力,充分利用多机器并行的计算能力,可以加速计算。不过 PySpark 的语法和 Pandas 差异也比较大,很多开发人员会感觉这很让人头大。

1980c6d9b6b20176a9c9bbfd2975c56d.png

在本篇内容中, ShowMeAI 将对最核心的数据处理和分析功能,梳理 PySpark 和 Pandas 相对应的代码片段,以便大家可以无痛地完成 Pandas 到大数据 PySpark 的转换😉

44596b0f0da58d803dad1301f98f90e2.png
大数据处理分析及机器学习建模相关知识, ShowMeAI制作了详细的教程与工具速查手册,大家可以通过如下内容展开学习或者回顾相关知识。

📘图解数据分析:从入门到精通系列教程

📘图解大数据技术:从入门到精通系列教程

📘图解机器学习算法:从入门到精通系列教程

📘数据科学工具库速查表 | Spark RDD 速查表

📘数据科学工具库速查表 | Spark SQL 速查表

💡 导入工具库

在使用具体功能之前,我们需要先导入所需的库:

# pandas vs pyspark,工具库导入
import pandas as pd
import pyspark.sql.functions as F

PySpark 所有功能的入口点是 SparkSession 类。通过 SparkSession 实例,您可以创建spark dataframe、应用各种转换、读取和写入文件等,下面是定义 SparkSession的代码模板:

from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()

💡 创建 dataframe

在 Pandas 和 PySpark 中,我们最方便的数据承载数据结构都是 dataframe,它们的定义有一些不同,我们来对比一下看看:

💦 Pandas

columns = ["employee","department","state","salary","age"]
data = [("Alain","Sales","Paris",60000,34),
        ("Ahmed","Sales","Lyon",80000,45),
        ("Ines","Sales","Nice",55000,30),
        ("Fatima","Finance","Paris",90000,28),
        ("Marie","Finance","Nantes",100000,40)]

创建DataFrame的 Pandas 语法如下:

df = pd.DataFrame(data=data, columns=columns)
# 查看头2行
df.head(2)

💦 PySpark

创建DataFrame的 PySpark 语法如下:

df = spark.createDataFrame(data).toDF(*columns)
# 查看头2行
df.limit(2).show()

💡 指定列类型

💦 Pandas

Pandas 指定字段数据类型的方法如下:

types_dict = {
    "employee": pd.Series([r[0] for r in data], dtype='str'),
    "department": pd.Series([r[1] for r in data], dtype='str'),
    "state": pd.Series([r[2] for r in data], dtype='str'),
    "salary": pd.Series([r[3] for r in data], dtype='int'),
    "age": pd.Series([r[4] for r in data], dtype='int')
}

df = pd.DataFrame(types_dict)

Pandas 可以通过如下代码来检查数据类型:

df.dtypes

💦 PySpark

PySpark 指定字段数据类型的方法如下:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
    StructField("employee",StringType(),True), \
    StructField("department",StringType(),True), \
    StructField("state",StringType(),True), \
    StructField("salary", IntegerType(), True), \
    StructField("age", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data,schema=schema)

PySpark 可以通过如下代码来检查数据类型:

df.dtypes
# 查看数据类型 
df.printSchema() 

💡 读写文件

Pandas 和 PySpark 中的读写文件方式非常相似。 具体语法对比如下:

💦 Pandas

df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)

💦 PySpark

df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')

注意 ①

PySpark 中可以指定要分区的列:

df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')

注意 ②

可以通过上面所有代码行中的 parquet 更改 CSV 来读取和写入不同的格式,例如 parquet 格式

💡 数据选择 - 列

💦 Pandas

在 Pandas 中选择某些列是这样完成的:

columns_subset = ['employee', 'salary']

df[columns_subset].head()

df.loc[:, columns_subset].head()

💦 PySpark

在 PySpark 中,我们需要使用带有列名列表的 select 方法来进行字段选择:

columns_subset = ['employee', 'salary']

df.select(columns_subset).show(5)

💡 数据选择 - 行

💦 Pandas

Pandas可以使用 iloc对行进行筛选:

# 头2行
df.iloc[:2].head()

💦 PySpark

在 Spark 中,可以像这样选择前 n 行:

df.take(2).head()
# 或者
df.limit(2).head()

注意:使用 spark 时,数据可能分布在不同的计算节点上,因此“第一行”可能会随着运行而变化。

💡 条件选择

💦 Pandas

Pandas 中根据特定条件过滤数据/选择数据的语法如下:

# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]

# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state == "Paris")')
# Or
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')

💦 PySpark

在 Spark 中,使用 filter方法或执行 SQL 进行数据选择。 语法如下:

# 方法1:基于filter进行数据选择
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))

# 或者
filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))

# 方法2:基于SQL进行数据选择
df.createOrReplaceTempView("people")

filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""") 

💡 添加字段

💦 Pandas

在 Pandas 中,有几种添加列的方法:

seniority = [3, 5, 2, 4, 10]
# 方法1
df['seniority'] = seniority

# 方法2
df.insert(2, "seniority", seniority, True)

💦 PySpark

在 PySpark 中有一个特定的方法withColumn可用于添加列:

seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)

💡 dataframe拼接

💦 2个dataframe - pandas

# pandas拼接2个dataframe
df_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)

💦 2个dataframe - PySpark

# PySpark拼接2个dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)

💦 多个dataframe - pandas

# pandas拼接多个dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)

💦 多个dataframe - PySpark

PySpark 中 unionAll 方法只能用来连接两个 dataframe。我们使用 reduce 方法配合unionAll来完成多个 dataframe 拼接:

# pyspark拼接多个dataframe
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

dfs = [df, df1, df2,...,dfn]
df = unionAll(*dfs)

💡 简单统计

Pandas 和 PySpark 都提供了为 dataframe 中的每一列进行统计计算的方法,可以轻松对下列统计值进行统计计算:

  • 列元素的计数
  • 列元素的平均值
  • 最大值
  • 最小值
  • 标准差
  • 三个分位数:25%、50% 和 75%

Pandas 和 PySpark 计算这些统计值的方法很类似,如下:

💦 Pandas & PySpark

df.summary()
#或者
df.describe()

💡 数据分组聚合统计

Pandas 和 PySpark 分组聚合的操作也是非常类似的:

💦 Pandas

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

💦 PySpark

df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

但是,最终显示的结果需要一些调整才能一致。

在 Pandas 中,要分组的列会自动成为索引,如下所示:

e0c91c1cd719ba55f79fece115a85941.png

要将其作为列恢复,我们需要应用 reset_index方法:

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()
c4cc659e4a633679e0c9de79515c7ad3.png

在 PySpark 中,列名会在结果dataframe中被重命名,如下所示:

25a5644c0b17b1efa3fc4e63236b2125.png

要恢复列名,可以像下面这样使用别名方法:

df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))
b618ebbf373bc3fc631249214b176256.png

💡 数据转换

在数据处理中,我们经常要进行数据变换,最常见的是要对「字段/列」应用特定转换,在Pandas中我们可以轻松基于apply函数完成,但在PySpark 中我们可以使用udf(用户定义的函数)封装我们需要完成的变换的Python函数。

例如,我们对salary字段进行处理,如果工资低于 60000,我们需要增加工资 15%,如果超过 60000,我们需要增加 5%。

💦 Pandas

Pandas 中的语法如下:

df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)

💦 Pyspark

PySpark 中的等价操作下:

from pyspark.sql.types import FloatType

df.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))

⚠️ 请注意, udf方法需要明确指定数据类型(在我们的例子中为 FloatType)

💡 总结

本篇内容中, ShowMeAI 给大家总结了Pandas和PySpark对应的功能操作细节,我们可以看到Pandas和PySpark的语法有很多相似之处,但是要注意一些细节差异。

另外,大家还是要基于场景进行合适的工具选择:

  • 在处理大型数据集时,使用 PySpark 可以为您提供很大的优势,因为它允许并行计算。
  • 如果您正在使用的数据集很小,那么使用Pandas会很快和灵活。

参考资料

推荐阅读

e9190f41b8de4af38c8a1a0c96f0513b~tplv-k3u1fbpfcp-zoom-1.image

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
59 0
|
17天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
45 6
|
15天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
58 2
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
55 1
|
16天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
27天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
34 1
|
7天前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
1月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算