【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: 标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。

什么数据规范化?

规范化(Normalization)是一种数据预处理技术,用于将不同范围的特征值映射到相同的范围内。其中,StandardScaler 是一种规范化的方法,它将特征值转换为均值为 0、方差为 1 的标准正态分布。

需要注意的是,StandardScaler 的使用需要对数据进行归一化,以确保每个特征值的范围相同。否则,具有较大范围的特征值可能会在标准化后对结果产生较大的影响。

为什么要对数据进行规范化处理?

对数据进行规范化处理有以下几个原因:

  1. 降低特征的重要性:在机器学习中,不同的特征通常具有不同的尺度和范围,这可能会导致某些特征在预测中具有较大的重要性。通过规范化处理,可以将所有特征缩放到相同的尺度和范围,降低特征的重要性,从而提高模型的稳定性和可靠性。

  2. 提高模型的精度:在某些机器学习算法中,例如K均值聚类和支持向量机,距离和相似度的度量对算法的结果产生很大影响。如果特征具有不同的尺度和范围,可能会导致距离和相似度的度量产生偏差,从而降低模型的精度。通过规范化处理,可以消除这种偏差,提高模型的精度。

  3. 加速模型的训练:在机器学习中,通常需要对数据进行多次迭代来训练模型。如果特征具有不同的尺度和范围,可能会导致收敛速度较慢,从而延长训练时间。通过规范化处理,可以提高收敛速度,加速模型的训练。

  4. 优化计算资源的利用:在大数据环境中,对数据进行规范化处理可以使得机器学习算法在分布式计算环境中更好地运行,从而充分利用计算资源,提高计算效率。

进行规范化处理可以提高模型的稳定性、可靠性、精度、训练速度和计算效率,从而更好地应用于实际场景中。

Spark ML 之 StandardScaler

StandardScaler 是 Spark ML 中的一个数据转换器,用于对数据进行规范化处理。它可以将数据集的每个特征按照均值和标准差进行转换,使得所有特征具有相同的尺度和范围,从而降低特征的重要性,提高模型的稳定性和可靠性。

Spark ML 应用示例

首先,引入 Spark ML 的依赖,这里不做过多的赘述了。

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

1.创建 SparkSession 对象

        val spark: SparkSession = SparkSession
                .builder
                .master("local[*]")
                .appName("Test")
                .getOrCreate()

2.创建一个示例数据集,格式为 DataFrame

        import spark.implicits._

        val data: DataFrame = Seq(
            (1.0, 2.0, 3.0),
            (2.0, 3.0, 4.0),
            (3.0, 4.0, 5.0)
        ).toDF("feature1","feature2","feature3")

3.将 DataFrame 转换为向量

        val assembler: VectorAssembler = new VectorAssembler()
                .setInputCols(Array("feature1", "feature2", "feature3"))
                .setOutputCol("features")

        val assemblerData: DataFrame = assembler.transform(data)

将所有的列都作为输入列]("feature1", "feature2", "feature3"),转换为特征向量。

VectorAssembler 是 Spark MLlib 中的一个Transformer,其作用是将多个列组合成一个特征向量。

在机器学习中,通常需要将多个特征组合在一起作为模型的输入,而 VectorAssembler 可以很方便地完成这个任务,它可以将数据集中的多个列组合成一个稠密的向量或者一个稀疏的向量,这取决于输入的列和参数设置。

4.创建 StandardScaler 实例,并设置输入和输出列

        val scaler = new StandardScaler()
                .setInputCol("features")
                .setOutputCol("scaled_features")
                .setWithMean(true)
                .setWithStd(true)

参数说明:

  • .setInputCol("features") 设置输入列的名称为 "features",即指定要对哪个列的数据进行规范化处理。这里的 "features" 是我们在转换向量时 DataFrame 的输出列名。

  • .setOutputCol("scaled_features") 设置输出列的名称为 "scaled_features",即指定规范化处理后的数据要保存到哪个列。这个列名可以根据需要自行命名,不一定非得叫 "scaled_features"

  • .setWithMean(true) 设置是否将数据中心化,即将数据的均值设为 0。默认值为 false,即不中心化。

  • .setWithStd(true) 设置是否将数据标准化,即将数据除以标准差。默认值为 true,即标准化数据,使得数据的均值为 0,方差为 1,从而将数据转化为标准正态分布。

中心化就是将数据减去其均值,使其以 0 为中心点,也称为零均值化。具体来说,对于一个向量 X,其中心化后的向量 X' 可以表示为: X' = X - mean(X)
其中 mean(X) 是向量 X 的均值。中心化后的向量 X' 的均值将会变为 0,因为:mean(X') = mean(X - mean(X)) = mean(X) - mean(X) = 0
这样做的好处是,消除了数据的平移性,将数据转化为以 0 为中心的分布,便于不同指标之间的比较和加权。同时,中心化并不会改变数据的比例关系,也就是说,中心化后的数据与原始数据之间的关系不会发生变化,因此,中心化操作并不会影响模型的精度和稳定性,反而可以提高模型的精度和稳定性。

5.对数据集进行拟合与转换

        val scalerModel = scaler.fit(assemblerData)
        val scaledData = scalerModel.transform(assemblerData)

6.输出处理完成的数据

        scaledData.show(false)

输入结果如下所示:

+--------+--------+--------+-------------+----------------+
|feature1|feature2|feature3|features     |scaled_features |
+--------+--------+--------+-------------+----------------+
|1.0     |2.0     |3.0     |[1.0,2.0,3.0]|[-1.0,-1.0,-1.0]|
|2.0     |3.0     |4.0     |[2.0,3.0,4.0]|[0.0,0.0,0.0]   |
|3.0     |4.0     |5.0     |[3.0,4.0,5.0]|[1.0,1.0,1.0]   |
+--------+--------+--------+-------------+----------------+

将向量 Vector 拆分为列

从上面的示例中,可以看到 scaled_features 列就是规范化处理后的数据,但还需要进一步处理该数据,取到每列数据。

因为在进行数据预测时,我们使用的特征一般都是整型或浮点型,然而我们模型训练后的数据 scaled_features 列是一个向量类型的数据,注意,它不是一个数组!那么该如何取出该列中的每个元素呢?

在 Spark 3.0 以后提供了 vector_to_array() 方法用于将向量转换为数组,如下所示:

        val dataFrame: DataFrame = scaledData
                .withColumn("new_scaled_features", vector_to_array(col("scaled_features")))

        dataFrame.show(false)

其中 withColumn() 方法将添加一个新的名为 "new_scaled_features" 的列到新的 DataFrame 中,将 "scaled_features" 列向量转换成数组作为值。

输出结果如下所示:

+--------+--------+--------+-------------+----------------+-------------------+
|feature1|feature2|feature3|features     |scaled_features |new_scaled_features|
+--------+--------+--------+-------------+----------------+-------------------+
|1.0     |2.0     |3.0     |[1.0,2.0,3.0]|[-1.0,-1.0,-1.0]|[-1.0, -1.0, -1.0] |
|2.0     |3.0     |4.0     |[2.0,3.0,4.0]|[0.0,0.0,0.0]   |[0.0, 0.0, 0.0]    |
|3.0     |4.0     |5.0     |[3.0,4.0,5.0]|[1.0,1.0,1.0]   |[1.0, 1.0, 1.0]    |
+--------+--------+--------+-------------+----------------+-------------------+

其中 new_scaled_features 列就是我们转换成数组之后的列,现在我们就可以对其进行直接提取啦,如下所示:

        dataFrame
                .select(
                    col("new_scaled_features").getItem(0).alias("first"),
                    col("new_scaled_features").getItem(1).alias("second"),
                    col("new_scaled_features").getItem(2).alias("third")
                )
                .show(false)

输出结果如下:

+-----+------+-----+
|first|second|third|
+-----+------+-----+
|-1.0 |-1.0  |-1.0 |
|0.0  |0.0   |0.0  |
|1.0  |1.0   |1.0  |
+-----+------+-----+

向量拆分技巧 GOT!

相关文章
|
1天前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5天前
|
分布式计算 定位技术 Scala
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
使用spark基于出租车GPS数据实现车辆数量统计以及北京每个城区的车辆位置点数分析
|
5天前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
|
5天前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
22天前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
1月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
1天前
|
分布式计算 Hadoop 大数据
大数据技术:Hadoop与Spark的对比
【6月更文挑战第15天】**Hadoop与Spark对比摘要** Hadoop是分布式系统基础架构,擅长处理大规模批处理任务,依赖HDFS和MapReduce,具有高可靠性和生态多样性。Spark是快速数据处理引擎,侧重内存计算,提供多语言接口,支持机器学习和流处理,处理速度远超Hadoop,适合实时分析和交互式查询。两者在资源占用和生态系统上有差异,适用于不同应用场景。选择时需依据具体需求。
|
4天前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
26 6

热门文章

最新文章