【Spark】(task4)SparkML基础(数据编码)

简介: 【导言】park是一个快速和通用的大数据引擎,可以通俗的理解成一个分布式的大数据处理框架,允许用户将Spark部署在大量廉价的硬件之上,形成集群。Spark使用scala 实现,提供了 JAVA, Python,R等语言的调用接口。本次task4学习sparkML基础(数据编码,分类,聚类模型等)。

一、构建ML Pipeline机器学习流程

如果样本较少,可以直接使用python对样本进行ML建模,但当需要大规模数据集时,可以使用spark进行分布式内存计算,虽然spark的原生语言是scala,但如果用python写可以用pyspark。

1.1 ML Pipeline构建流程

spark有MLlib机器学习库,比ML Pipeline复杂,先来大概看下ML Pipeline构建机器学习流程:

数据准备: 将特征值和预测变量整理成DataFrame

建立机器学习流程Pipeline:

StringIndexer:将文字分类特征转化为数字

OneHotEncoder:将数字分类特征转化为稀疏向量

VectorAssembler:将所有特征字段整合成一个Vector字段

DecisionTreeClassfier:训练生成模型

训练:训练集使用pipeline.fit()进行训练,产生pipelineModel

预测:使用pipelineModel.transform()预测测试集,产生预测结果

1.2 ML Pipeline组件

注意:pyspark的一些组件和python中的同名组件不完全一样:

DataFrame: 是Spark ML机器学习API处理的数据格式,可以由文本文件、RDD、或者Spark SQL创建,与python 的Dataframe概念相近但是方法完全不同。

Transformer:可以使用.transform方法将一个DataFrame转换成另一个DataFrame。

Estimator:可以使用.fit方法传入DataFrame,生成一个Transformer。

pipeline:可以串联多个Transformer和Estimator建立ML机器学习的工作流。

Parameter:以上Transformer和Estimator都可以共享的参数API。

二、数据编码

2.1 学习Spark ML中数据编码模块

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#feature

https://spark.apache.org/docs/latest/ml-features.html

2.2 读取文件Pokemon.csv,理解数据字段含义

import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkFiles
# 创建spark应用
spark = SparkSession.builder.appName('SparkTest').getOrCreate()
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
pokemon = spark.read.csv(path = SparkFiles.get('Pokemon.csv'), header=True, inferSchema=True)
# 字段名字重命名
pokemon = pokemon.withColumnRenamed('Sp. Atk', 'Sp Atk')
pokemon = pokemon.withColumnRenamed('Sp. Def', 'Sp Def')
pokemon.show(5)

创建spark会话应用、字段重命名后的pokeman表:

+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|                Name|Type 1|Type 2|Total| HP|Attack|Defense|Sp Atk|Sp Def|Speed|Generation|Legendary|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
|           Bulbasaur| Grass|Poison|  318| 45|    49|     49|    65|    65|   45|         1|    false|
|             Ivysaur| Grass|Poison|  405| 60|    62|     63|    80|    80|   60|         1|    false|
|            Venusaur| Grass|Poison|  525| 80|    82|     83|   100|   100|   80|         1|    false|
|VenusaurMega Venu...| Grass|Poison|  625| 80|   100|    123|   122|   120|   80|         1|    false|
|          Charmander|  Fire|  null|  309| 39|    52|     43|    60|    50|   65|         1|    false|
+--------------------+------+------+-----+---+------+-------+------+------+-----+----------+---------+
only showing top 5 rows

字段名类型:

pokemon.dtypes
“””
[('Name', 'string'),
 ('Type 1', 'string'),
 ('Type 2', 'string'),
 ('Total', 'int'),
 ('HP', 'int'),
 ('Attack', 'int'),
 ('Defense', 'int'),
 ('Sp Atk', 'int'),
 ('Sp Def', 'int'),
 ('Speed', 'int'),
 ('Generation', 'int'),
 ('Legendary', 'boolean')]
“””
# encoding=utf-8
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
# 任务5:SparkML基础:数据编码
# 步骤0:连接spark集群
spark = SparkSession.builder.appName('pyspark').getOrCreate()
# 步骤1:学习Spark ML中数据编码模块
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#feature
# https://spark.apache.org/docs/latest/ml-features.html
# 步骤2:读取文件Pokemon.csv,理解数据字段含义
# 步骤2.1:读取文件https://cdn.coggle.club/Pokemon.csv
spark.sparkContext.addFile("https://cdn.coggle.club/Pokemon.csv")
path = "file://"+SparkFiles.get("Pokemon.csv")
# 步骤2.2:将读取的进行保存,表头也需要保存
df = spark.read.csv(path=path, header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'SpAtk')
df = df.withColumnRenamed('Sp. Def', 'SpDef')
df = df.withColumnRenamed('Type 1', 'Type1')
df = df.withColumnRenamed('Type 2', 'Type2')
df.show(n=3)
# 属于“类别属性”的字段:Type1, Type2, Generation
# 属于“数值属性”的字段:Total,HP,Attack,Defense,SpAtk,SpDef,Speed

2.3 将其中的类别属性使用onehotencoder

将类别属性进行 one hot 独热编码。先来看下onehotencoder的参数:

class pyspark.ml.feature.OneHotEncoder(*, 
                     inputCols=None, 
                     outputCols=None, 
                     handleInvalid='error', 
                     dropLast=True, 
                     inputCol=None, 
                     outputCol=None)
# 步骤3:将其中的类别属性使用onehotencoder
# 步骤3.1:将字符串类型特征转换为索引类型
# https://github.com/apache/spark/blob/master/examples/src/main/python/ml/string_indexer_example.py
indexer = StringIndexer(
    inputCols=["Type1", "Type2"],
    outputCols=["Type1_idx", "Type2_idx"],
    handleInvalid='skip')
df = indexer.fit(df).transform(df)
df.show(n=3)
# 步骤3.2:将索引类型特征转换为one-hot编码
# https://github.com/apache/spark/blob/master/examples/src/main/python/ml/onehot_encoder_example.py
one_hot_encoder = OneHotEncoder(
    inputCols=['Type1_idx', 'Type2_idx', 'Generation'],
    outputCols=["Type1_vec", "Type2_vec", "Generation_vec"])
df = one_hot_encoder.fit(df).transform(df)
df.show(n=3)

对应的字符串类型转为索引类型后、将索引特征转为one hot向量的结果:

+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+
|     Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|Type1_idx|Type2_idx|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+
|Bulbasaur|Grass|Poison|  318| 45|    49|     49|   65|   65|   45|         1|    false|      2.0|      2.0|
|  Ivysaur|Grass|Poison|  405| 60|    62|     63|   80|   80|   60|         1|    false|      2.0|      2.0|
| Venusaur|Grass|Poison|  525| 80|    82|     83|  100|  100|   80|         1|    false|      2.0|      2.0|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+
only showing top 3 rows
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+
|     Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|Type1_idx|Type2_idx|     Type1_vec|     Type2_vec|Generation_vec|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+
|Bulbasaur|Grass|Poison|  318| 45|    49|     49|   65|   65|   45|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|
|  Ivysaur|Grass|Poison|  405| 60|    62|     63|   80|   80|   60|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|
| Venusaur|Grass|Poison|  525| 80|    82|     83|  100|  100|   80|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+
only showing top 3 rows

2.4 对其中的数值属性字段使用 minmaxscaler

对数值属性字段我们常用归一化(如果是常用的最大-最小归一化),公式为:Rescaled(e_i) = (e_i - E_min) / (E_max - E_min) * (max - min) + min。minmaxscaler其参数如下:

pyspark.ml.feature.MinMaxScaler(*,
                 min = 0.0,
                 max = 1.0,
                 inputCol = None,
                 outputCol = None)
# 步骤4:对其中的数值属性字段使用minmaxscaler
# https://stackoverflow.com/questions/60281354/apply-minmaxscaler-on-multiple-columns-in-pyspark
columns_to_scale = ["Total", "HP", "Attack", "Defense", "SpAtk", "SpDef", "Speed"]
assemblers, scalers = list(), list()
for col in columns_to_scale:
    vec = VectorAssembler(inputCols=[col], outputCol=col + "_vec")
    assemblers.append(vec)
    sc = MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled")
    scalers.append(sc)
pipeline = Pipeline(stages=assemblers + scalers)
df = pipeline.fit(df).transform(df)
df.show(n=3)

对应的结果为:

+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|Type1_idx|Type2_idx|     Type1_vec|     Type2_vec|Generation_vec|Total_vec|HP_vec|Attack_vec|Defense_vec|SpAtk_vec|SpDef_vec|Speed_vec|        Total_scaled|           HP_scaled|       Attack_scaled|      Defense_scaled|        SpAtk_scaled|        SpDef_scaled|        Speed_scaled|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Bulbasaur|Grass|Poison|  318| 45|    49|     49|   65|   65|   45|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [318.0]|[45.0]|    [49.0]|     [49.0]|   [65.0]|   [65.0]|   [45.0]|[0.21694915254237...|[0.2953020134228188]|[0.21666666666666...|[0.15813953488372...|[0.3235294117647059]|[0.2142857142857143]|[0.25806451612903...|
|  Ivysaur|Grass|Poison|  405| 60|    62|     63|   80|   80|   60|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [405.0]|[60.0]|    [62.0]|     [63.0]|   [80.0]|   [80.0]|   [60.0]|[0.3644067796610169]|[0.3959731543624161]|[0.2888888888888889]|[0.22325581395348...|[0.4117647058823529]|[0.28571428571428...|[0.3548387096774194]|
| Venusaur|Grass|Poison|  525| 80|    82|     83|  100|  100|   80|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [525.0]|[80.0]|    [82.0]|     [83.0]|  [100.0]|  [100.0]|   [80.0]|[0.5677966101694915]|[0.5302013422818792]|               [0.4]|[0.31627906976744...|[0.5294117647058824]| [0.380952380952381]|[0.4838709677419355]|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

2.5 对编码后的属性使用pca进行降维(维度可以自己选择)

PCA降维,这里我选维度K=5。

# 步骤5:对编码后的属性使用pca进行降维(维度可以自己选择)
# encoded features: Type1_vec, Type2_vec, Generation_vec, Total_scaled, HP_scaled,
#   Attack_scaled, Defense_scaled, SpAtk_scaled, SpDef_scaled, Speed_scaled
cols = ["Type1_vec", "Type2_vec", "Generation_vec", "Total_scaled", "HP_scaled",
        "Attack_scaled", "Defense_scaled", "SpAtk_scaled", "SpDef_scaled", "Speed_scaled"]
assembler = VectorAssembler(inputCols=cols, outputCol="features")
df = assembler.transform(df)
df.select("features").show(n=3)
# https://github.com/apache/spark/blob/master/examples/src/main/python/ml/pca_example.py
pca = PCA(k=5, inputCol="features", outputCol="pca")
df = pca.fit(df).transform(df)
df.show(n=3)
rows = df.select("pca").collect()
print(rows[0].asDict())
spark.stop()

结果为:

+--------------------+
|            features|
+--------------------+
|(47,[2,19,35,40,4...|
|(47,[2,19,35,40,4...|
|(47,[2,19,35,40,4...|
+--------------------+
only showing top 3 rows
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|Type1_idx|Type2_idx|     Type1_vec|     Type2_vec|Generation_vec|Total_vec|HP_vec|Attack_vec|Defense_vec|SpAtk_vec|SpDef_vec|Speed_vec|        Total_scaled|           HP_scaled|       Attack_scaled|      Defense_scaled|        SpAtk_scaled|        SpDef_scaled|        Speed_scaled|            features|                 pca|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Bulbasaur|Grass|Poison|  318| 45|    49|     49|   65|   65|   45|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [318.0]|[45.0]|    [49.0]|     [49.0]|   [65.0]|   [65.0]|   [45.0]|[0.21694915254237...|[0.2953020134228188]|[0.21666666666666...|[0.15813953488372...|[0.3235294117647059]|[0.2142857142857143]|[0.25806451612903...|(47,[2,19,35,40,4...|[0.34275937676253...|
|  Ivysaur|Grass|Poison|  405| 60|    62|     63|   80|   80|   60|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [405.0]|[60.0]|    [62.0]|     [63.0]|   [80.0]|   [80.0]|   [60.0]|[0.3644067796610169]|[0.3959731543624161]|[0.2888888888888889]|[0.22325581395348...|[0.4117647058823529]|[0.28571428571428...|[0.3548387096774194]|(47,[2,19,35,40,4...|[0.32329833337804...|
| Venusaur|Grass|Poison|  525| 80|    82|     83|  100|  100|   80|         1|    false|      2.0|      2.0|(17,[2],[1.0])|(17,[2],[1.0])| (6,[1],[1.0])|  [525.0]|[80.0]|    [82.0]|     [83.0]|  [100.0]|  [100.0]|   [80.0]|[0.5677966101694915]|[0.5302013422818792]|               [0.4]|[0.31627906976744...|[0.5294117647058824]| [0.380952380952381]|[0.4838709677419355]|(47,[2,19,35,40,4...|[0.29572580767124...|
+---------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+---------+---------+--------------+--------------+--------------+---------+------+----------+-----------+---------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows
{'pca': DenseVector([0.3428, -0.8743, -0.6616, 0.0442, 0.7151])}
相关文章
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
61 3
|
5月前
|
存储 分布式计算 Java
|
5月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
272 4
|
5月前
|
存储 缓存 分布式计算
|
5月前
|
SQL 存储 分布式计算
|
5月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
72 1
|
6月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
49 1
|
7月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
140 0