PySpark ML(转换器)

简介: PySpark转换器详解

PySpark ML(转换器)

在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。

ML简介

在ML包中主要包含了三个主要的抽象类:转换器、评估器、管道,本文先来介绍第一种抽象类——转换器。

转换器

在PySpark中,我们通常通过将一个新列附加到DataFrame来转换数据。

  • Binarizer()
  • 用处:根据指定的阈值将连续变量转换为对应的二进制值。
  • 使用方法示例:
from pyspark.ml.feature import Binarizer
df = spark.createDataFrame([(0.5, ), (1.0, ), (1.5, )], ['values'])
binarizer = Binarizer(threshold=0.7, inputCol="values", outputCol="features")
binarizer.transform(df).show()

# 结果展示
+------+--------+
|values|features|
+------+--------+
|   0.5|     0.0|
|   1.0|     1.0|
|   1.5|     1.0|
+------+--------+
  • Bucketizer()
  • 用处:将连续变量离散化到指定的范围区间。
  • 使用方法示例:
from pyspark.ml.feature import Bucketizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
          (float("nan"), )]
df = spark.createDataFrame(values, ["values"])
# splits 为分类区间
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4,
                                float("inf")],
                        inputCol="values",
                        outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
bucketed.show()

# 结果展示
+------+-------+
|values|buckets|
+------+-------+
|   0.1|    0.0|
|   0.4|    0.0|
|   1.2|    1.0|
|   1.5|    2.0|
|   NaN|    3.0|
|   NaN|    3.0|
+------+-------+
  • ChiSqSelector()
  • 用处:使用卡方检验完成选择。
  • 使用方法示例:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ChiSqSelector
df = spark.createDataFrame([(Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
                            (Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),
                            (Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],
                           ["features", "label"])
selector = ChiSqSelector(numTopFeatures=2, outputCol="selectedFeatures")
model = selector.fit(df)
model.transform(df).show()

# 结果展示
+------------------+-----+----------------+
|          features|label|selectedFeatures|
+------------------+-----+----------------+
|[0.0,0.0,18.0,1.0]|  1.0|      [18.0,1.0]|
|[0.0,1.0,12.0,0.0]|  0.0|      [12.0,0.0]|
|[1.0,0.0,15.0,0.1]|  0.0|      [15.0,0.1]|
+------------------+-----+----------------+
  • CountVectorizer()
  • 用处:从数据集中学习某种模式,对数据进行标记
  • 使用方法示例:
from pyspark.ml.feature import CountVectorizer
df = spark.createDataFrame([(0, ["a", "b", "c"]),
                            (1, ["a", "b", "b", "c", "a"])], ["label", "raw"])
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)
model.transform(df).show(truncate=False)

# 结果展示
+-----+---------------+-------------------------+
|label|raw            |vectors                  |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+
  • ElementwiseProduct()
  • 用处:返回传入向量和参数scalingVec的乘积
  • 使用方法示例:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]), )], ["values"])
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]),
                        inputCol="values",
                        outputCol="eprod")
ep.transform(df).show()
ep.setParams(scalingVec=Vectors.dense([2.0, 3.0, 5.0])).transform(df).show()

# 结果展示
+-------------+-------------+
|       values|        eprod|
+-------------+-------------+
|[2.0,1.0,3.0]|[2.0,2.0,9.0]|
+-------------+-------------+

+-------------+--------------+
|       values|         eprod|
+-------------+--------------+
|[2.0,1.0,3.0]|[4.0,3.0,15.0]|
+-------------+--------------+
  • MaxAbsScaler()
  • 用处:将数据调整到[-1,1]范围内(不会移动数据的中心)
  • 使用方法示例:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([1.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()

# 结果展示
+-----+------+
|    a|scaled|
+-----+------+
|[1.0]| [0.5]|
|[2.0]| [1.0]|
+-----+------+
  • MinMaxScaler()
  • 用处:将数据缩放到[0,1]范围内(最大最小归一化)。
  • 使用方法示例:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
print(model.originalMin, model.originalMax)
model.transform(df).show()

# 结果展示
[0.0] [2.0]
+-----+------+
|    a|scaled|
+-----+------+
|[0.0]| [0.0]|
|[2.0]| [1.0]|
+-----+------+
  • NGram()
  • 用处:返回NGram算法后的结果。
  • 使用方法示例:
from pyspark.ml.feature import NGram
from pyspark.sql import Row
df = spark.createDataFrame([Row(inputTokens=["a", "b", "c", "d", "e"])])
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df).show()

# 结果展示
+---------------+--------------------+
|    inputTokens|              nGrams|
+---------------+--------------------+
|[a, b, c, d, e]|[a b, b c, c d, d e]|
+---------------+--------------------+
  • Normalizer()
  • 用处:使用p范数将数据缩放为单位范数(默认为L2)。
  • 使用方法示例:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)],
                           ["dense", "sparse"])
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).show()

# 结果展示
+----------+-------------------+----------+
|     dense|             sparse|  features|
+----------+-------------------+----------+
|[3.0,-4.0]|(4,[1,3],[4.0,3.0])|[0.6,-0.8]|
+----------+-------------------+----------+
  • OneHotEncoderEstimator()
  • 用处:将分类列编码为二进制向量列(独热编码)。
  • 使用方法示例:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0.0, ), (1.0, ), (2.0, )], ["input"])
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df).show()

# 结果展示
+-----+-------------+
|input|       output|
+-----+-------------+
|  0.0|(2,[0],[1.0])|
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
+-----+-------------+
  • PCA()
  • 用处:使用主成分分析执行数据降维(PCA算法)。
  • 使用方法示例:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]), ),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]), ),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]), )]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df).show(truncate=0)

# 结果展示
+---------------------+----------------------------------------+
|features             |pca_features                            |
+---------------------+----------------------------------------+
|(5,[1,3],[1.0,7.0])  |[1.6485728230883807,-4.013282700516296] |
|[2.0,0.0,3.0,4.0,5.0]|[-4.645104331781534,-1.1167972663619026]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.428880535676489,-5.337951427775355] |
+---------------------+----------------------------------------+
  • QuantileDiscretizer()
  • 用处:传入一个numBuckets参数,该方法通过计算数据的近似分位数来决定分隔应该是什么。
  • 使用方法示例:
from pyspark.ml.feature import QuantileDiscretizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
          (float("nan"), )]
df = spark.createDataFrame(values, ["values"])
qds = QuantileDiscretizer(numBuckets=2,
                          inputCol="values",
                          outputCol="buckets",
                          relativeError=0.01,
                          handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df).show()

# 结果展示
+------+-------+
|values|buckets|
+------+-------+
|   0.1|    0.0|
|   0.4|    1.0|
|   1.2|    1.0|
|   1.5|    1.0|
|   NaN|    2.0|
|   NaN|    2.0|
+------+-------+
  • RegexTokenizer()
  • 用处:使用正则表达式的字符串分词器。
  • 使用方法示例:
from pyspark.ml.feature import RegexTokenizer
df = spark.createDataFrame([("A B  c", )], ["text"])
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df).show()

# 结果展示
+------+---------+
|  text|    words|
+------+---------+
|A B  c|[a, b, c]|
+------+---------+
  • StandardScaler()
  • 用处:数据标准化。
  • 使用方法示例:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
print(model.mean, model.std)
model.transform(df).show()

# 结果展示
[1.0] [1.4142135623730951]
+-----+-------------------+
|    a|             scaled|
+-----+-------------------+
|[0.0]|              [0.0]|
|[2.0]|[1.414213562373095]|
+-----+-------------------+
  • StopWordsRemover()
  • 用处:从标记文本中删除停用词。
  • 使用方法示例:
from pyspark.ml.feature import StopWordsRemover
df = spark.createDataFrame([(["a", "b", "c"], )], ["text"])
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df).show()

# 结果展示
+---------+------+
|     text| words|
+---------+------+
|[a, b, c]|[a, c]|
+---------+------+
  • Tokenizer()
  • 用处:将字符串转成小写,然后以空格为分隔符分词。
  • 使用方法示例:
from pyspark.ml.feature import Tokenizer
df = spark.createDataFrame([("ASD VA c", )], ["text"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df).show()

# 结果展示
+--------+------------+
|    text|       words|
+--------+------------+
|ASD VA c|[asd, va, c]|
+--------+------------+
  • VectorSlicer()
  • 用处:给定一个索引列表,从特征向量中提取值(作用于特征向量,不管是密集的还是稀疏的)。
  • 使用方法示例:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([-2.0, 2.3, 0.0, 0.0, 1.0]), ),
                            (Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0]), ),
                            (Vectors.dense([0.6, -1.1, -3.0, 4.5, 3.3]), )],
                           ["features"])
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).show(truncate=0)

# 结果展示
+-----------------------+----------+
|features               |sliced    |
+-----------------------+----------+
|[-2.0,2.3,0.0,0.0,1.0] |[2.3,1.0] |
|[0.0,0.0,0.0,0.0,0.0]  |[0.0,0.0] |
|[0.6,-1.1,-3.0,4.5,3.3]|[-1.1,3.3]|
+-----------------------+----------+
  • VectorAssembler()
  • 用处:将多个数字(包括向量)列合并为一列向量。
  • 使用方法示例:
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df).show()

# 结果展示
+---+---+---+-------------+
|  a|  b|  c|     features|
+---+---+---+-------------+
|  1|  0|  3|[1.0,0.0,3.0]|
+---+---+---+-------------+
  • Word2Vec()
  • 用处:将一个句子(字符串)作为输入,将其转换为{string, vector}格式的映射。
  • 使用方法示例:
from pyspark.ml.feature import Word2Vec
sent = ("a b " * 100 + "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent, ), (sent, )], ["sentence"])
word2Vec = Word2Vec(vectorSize=5,
                    seed=42,
                    inputCol="sentence",
                    outputCol="model")
model = word2Vec.fit(doc)
model.getVectors().show()

# 结果展示
+----+--------------------+
|word|              vector|
+----+--------------------+
|   a|[0.09461779892444...|
|   b|[1.15474212169647...|
|   c|[-0.3794820010662...|
+----+--------------------+

小结

在PySpark中转换器实现的功能类似于数据转换,这种生成新数据的数据预处理形式是我们在进行机器学习任务过程中重要的一部分呢,下一篇我们将会讲解第二种抽象类:估计器。

相关文章
|
3月前
|
分布式计算 大数据 数据处理
如何在 PySpark 中实现自定义转换
【8月更文挑战第14天】
45 4
|
机器学习/深度学习 数据采集 存储
初探 Spark ML 第一部分
初探 Spark ML 第一部分
121 1
|
机器学习/深度学习 算法 数据挖掘
PySpark ML (评估器)
PySpark 估计器使用方法介绍
PySpark ML (评估器)
|
存储 分布式计算 Spark
PySpark|RDD编程基础
PySpark数据结构RDD编程基础
PySpark|RDD编程基础
|
机器学习/深度学习 人工智能 分布式计算
PySpark数据分析基础:PySpark原理详解
PySpark数据分析基础:PySpark原理详解
422 1
PySpark数据分析基础:PySpark原理详解
|
机器学习/深度学习 分布式计算 算法
PySpark ML——分布式机器学习库
继续PySpark学习之路,本篇开启机器学习子模块的介绍,不会更多关注机器学习算法原理,仅对ML库的基本框架和理念加以介绍。最后用一个小例子实战对比下sklearn与pyspark.ml库中随机森林分类器效果。
638 0
PySpark ML——分布式机器学习库
|
分布式计算 Java Apache
Spark - ml.dmlc.xgboost4j / spark 版本匹配与 NoSuchMethodError 解决
spark 项目引入 ml.dmlc.xgboost4j 训练并读取 xgboost 模型,load 模型期间报错 NoSuchMethodError,通过源码的分析得到 xgboost 与 spark 版本对应关系。
374 0
Spark - ml.dmlc.xgboost4j / spark 版本匹配与 NoSuchMethodError 解决
|
分布式计算 Java 网络安全
【Spark】(task1)PySpark基础数据处理
)Scala 是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。 Scala 运行在 Java 虚拟机上,并兼容现有的 Java 程序。
322 0
【Spark】(task1)PySpark基础数据处理
|
机器学习/深度学习 SQL 数据采集
Spark 中 ML 和 MLlib 的特点和区别
Spark 中 ML 和 MLlib 的特点和区别
407 0
|
机器学习/深度学习 分布式计算 算法