[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理2

本文涉及的产品
NLP自然语言处理_基础版,每接口每天50万次
NLP自然语言处理_高级版,每接口累计50万次
NLP 自学习平台,3个模型定制额度 1个月
简介: 用户定义函数(UDF:User-Defined Functions) UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。

图片.png

用户定义函数(UDF:User-Defined Functions)

UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。

  • 传统的Python函数

>>> from pyspark.sql.functions import udf
>>> def price_range(brand):
...     prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'}
...     return prices.get('test',"Low Price")
... 
>>> brand_udf=udf(price_range,StringType())
>>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
+-------+---+----------+------+-------+-----------+                             
|ratings|age|experience|family|mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3     |Vivo   |Low Price  |
|3      |27 |13.0      |3     |Apple  |Low Price  |
|4      |22 |2.5       |0     |Samsung|Low Price  |
|4      |37 |16.5      |4     |Apple  |Low Price  |
|5      |27 |9.0       |1     |MI     |Low Price  |
|4      |27 |9.0       |0     |Oppo   |Low Price  |
|5      |37 |23.0      |5     |Vivo   |Low Price  |
|5      |37 |23.0      |5     |Samsung|Low Price  |
|3      |22 |2.5       |0     |Apple  |Low Price  |
|3      |27 |6.0       |0     |MI     |Low Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows

>>> 
  • Lambda函数

>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
>>> df.withColumn("age_group", age_udf(df.age)).show(10,False)
+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3     |Vivo   |senior   |
|3      |27 |13.0      |3     |Apple  |young    |
|4      |22 |2.5       |0     |Samsung|young    |
|4      |37 |16.5      |4     |Apple  |senior   |
|5      |27 |9.0       |1     |MI     |young    |
|4      |27 |9.0       |0     |Oppo   |young    |
|5      |37 |23.0      |5     |Vivo   |senior   |
|5      |37 |23.0      |5     |Samsung|senior   |
|3      |22 |2.5       |0     |Apple  |young    |
|3      |27 |6.0       |0     |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows

图片.png

  • PandasUDF(矢量化UDF)

有两种类型的Pandas UDF:Scalar和GroupedMap。

Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。


>>> from pyspark.sql.functions import pandas_udf
>>> def remaining_yrs(age):
...     return (100-age)
... 
>>> from pyspark.sql.types import IntegerType
>>> length_udf = pandas_udf(remaining_yrs, IntegerType())
>>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+--------+                                
|ratings|age|experience|family|mobile |yrs_left|
+-------+---+----------+------+-------+--------+
|3      |32 |9.0       |3     |Vivo   |68      |
|3      |27 |13.0      |3     |Apple  |73      |
|4      |22 |2.5       |0     |Samsung|78      |
|4      |37 |16.5      |4     |Apple  |63      |
|5      |27 |9.0       |1     |MI     |73      |
|4      |27 |9.0       |0     |Oppo   |73      |
|5      |37 |23.0      |5     |Vivo   |63      |
|5      |37 |23.0      |5     |Samsung|63      |
|3      |22 |2.5       |0     |Apple  |78      |
|3      |27 |6.0       |0     |MI     |73      |
+-------+---+----------+------+-------+--------+
only showing top 10 rows
  • PandasUDF(多列)

>>> def prod(rating,exp):
...     return rating*exp
... 
>>> prod_udf = pandas_udf(prod, DoubleType())
>>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False)
/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------+---+----------+------+-------+-------+
|ratings|age|experience|family|mobile |product|
+-------+---+----------+------+-------+-------+
|3      |32 |9.0       |3     |Vivo   |27.0   |
|3      |27 |13.0      |3     |Apple  |39.0   |
|4      |22 |2.5       |0     |Samsung|10.0   |
|4      |37 |16.5      |4     |Apple  |66.0   |
|5      |27 |9.0       |1     |MI     |45.0   |
|4      |27 |9.0       |0     |Oppo   |36.0   |
|5      |37 |23.0      |5     |Vivo   |115.0  |
|5      |37 |23.0      |5     |Samsung|115.0  |
|3      |22 |2.5       |0     |Apple  |7.5    |
|3      |27 |6.0       |0     |MI     |18.0   |
+-------+---+----------+------+-------+-------+
only showing top 10 rows

删除重复值


>>> df.count()
33
>>> df=df.dropDuplicates()
>>> df.count()
26

删除列


>>> df_new=df.drop('mobile')
>>> df_new.show()
+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      3| 32|       9.0|     3|
|      4| 22|       2.5|     0|
|      5| 27|       6.0|     0|
|      4| 22|       6.0|     1|
|      3| 27|       6.0|     0|
|      2| 32|      16.5|     2|
|      4| 27|       9.0|     0|
|      2| 27|       9.0|     2|
|      3| 37|      16.5|     5|
|      4| 27|       6.0|     1|
|      5| 37|      23.0|     5|
|      2| 27|       6.0|     2|
|      4| 37|       6.0|     0|
|      5| 37|      23.0|     5|
|      4| 37|       9.0|     2|
|      5| 37|      13.0|     1|
|      5| 27|       2.5|     0|
|      3| 42|      23.0|     5|
|      5| 22|       2.5|     0|
|      1| 37|      23.0|     5|
+-------+---+----------+------+
only showing top 20 rows

参考资料

写数据

  • CSV

如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。


>>> write_uri = '/home/andrew/test.csv'
>>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
  • Parquet

如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。
我们可以轻松地将数据帧转换并保存为Parquet格式。

注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。

图片.png

相关文章
|
19天前
|
机器学习/深度学习 并行计算 大数据
【Python篇】深入挖掘 Pandas:机器学习数据处理的高级技巧
【Python篇】深入挖掘 Pandas:机器学习数据处理的高级技巧
45 3
|
2月前
|
机器学习/深度学习 自然语言处理 算法
【数据挖掘】百度机器学习-数据挖掘-自然语言处理工程师 历史笔试详解
文章汇总并解析了百度机器学习/数据挖掘工程师/自然语言处理工程师历史笔试题目,覆盖了多分类任务激活函数、TCP首部确认号字段、GMM-HMM模型、朴素贝叶斯模型、SGD随机梯度下降法、随机森林算法、强连通图、红黑树和完全二叉树的高度、最长公共前后缀、冒泡排序比较次数、C4.5属性划分标准、语言模型类型、分词算法、贝叶斯决策理论、样本信息熵、数据降维方法、分箱方法、物理地址计算、分时系统响应时间分析、小顶堆删除调整等多个知识点。
40 1
【数据挖掘】百度机器学习-数据挖掘-自然语言处理工程师 历史笔试详解
|
2月前
|
机器学习/深度学习 数据采集 自然语言处理
打造个性化新闻推荐系统:机器学习与自然语言处理的结合Java中的异常处理:从基础到高级
【8月更文挑战第27天】在信息过载的时代,个性化新闻推荐系统成为解决信息筛选难题的关键工具。本文将深入探讨如何利用机器学习和自然语言处理技术构建一个高效的新闻推荐系统。我们将从理论基础出发,逐步介绍数据预处理、模型选择、特征工程,以及推荐算法的实现,最终通过实际代码示例来展示如何将这些理论应用于实践,以实现精准的个性化内容推荐。
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
探索机器学习在自然语言处理中的应用
【8月更文挑战第22天】本文将深入探讨机器学习技术如何革新自然语言处理领域,从基础概念到高级应用,揭示其背后的原理和未来趋势。通过分析机器学习模型如何处理、理解和生成人类语言,我们将展示这一技术如何塑造我们的沟通方式,并讨论它带来的挑战与机遇。
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
探索机器学习中的自然语言处理技术
【7月更文挑战第40天】 随着人工智能的迅猛发展,自然语言处理(NLP)作为机器学习领域的重要分支,正逐渐改变我们与机器的互动方式。本文将深入探讨NLP的核心概念、关键技术以及在现实世界中的应用案例。我们将从基础原理出发,解析NLP如何处理和理解人类语言,并讨论最新的模型和算法如何提升NLP的性能。最后,通过几个实际应用场景的分析,展望NLP在未来可能带来的变革。
|
3月前
|
机器学习/深度学习 自然语言处理 数据挖掘
探索机器学习中的自然语言处理技术
【7月更文挑战第31天】本文深入探讨了自然语言处理(NLP)在机器学习领域的应用,包括其定义、重要性以及面临的挑战。文章进一步介绍了NLP的基本任务和常用技术,并通过实例展示了如何利用这些技术解决实际问题。最后,本文展望了NLP的未来发展方向和潜在影响。
|
2月前
|
机器学习/深度学习 自然语言处理 算法
【数据挖掘】百度机器学习-数据挖掘-自然语言处理工程师 2023届校招笔试详解
百度2023届校招机器学习/数据挖掘/自然语言处理工程师笔试的题目详解
68 1
|
3月前
|
机器学习/深度学习 数据采集 存储
机器学习在推荐系统中的应用
【7月更文挑战第31天】随着机器学习技术的不断发展和普及,推荐系统在电子商务、社交媒体、新闻资讯等领域的应用越来越广泛。机器学习算法的应用为推荐系统优化提供了全新的思路和方法,使得推荐系统能够更加智能化和个性化地为用户提供服务。未来,随着数据量的不断增加和算法的不断创新,推荐系统将会变得更加精准和高效,为用户带来更加优质的体验。
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
探索机器学习中的自然语言处理技术
【7月更文挑战第25天】自然语言处理(NLP)是机器学习领域的一个重要分支,它致力于让计算机能够理解、解释和生成人类语言。本文将深入探讨NLP的基本原理、关键技术以及在现实世界中的应用实例,旨在为读者提供一个全面的NLP技术概览,并展示其在现代科技中的重要性和应用前景。
|
3月前
|
机器学习/深度学习 数据采集 数据处理
重构数据处理流程:Pandas与NumPy高级特性在机器学习前的优化
【7月更文挑战第14天】在数据科学中,Pandas和NumPy是数据处理的关键,用于清洗、转换和计算。用`pip install pandas numpy`安装后,Pandas的`read_csv`读取数据,`fillna`处理缺失值,`drop`删除列。Pandas的`apply`、`groupby`和`merge`执行复杂转换。NumPy加速数值计算,如`square`进行向量化操作,`dot`做矩阵乘法。结合两者优化数据预处理,提升模型训练效率和效果。
55 1