Spark机器学习管道 - Estimator

简介: Spark机器学习管道 - Estimator

一、实验目的

掌握Spark机器学习管道中常用Estimator的使用。

二、实验内容

1、使用IDF estimator,计算每个单词的重要性。

 2、使用StringIndexer estimator来对电影类型进行编码。

 3、使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。

 4、使用MinMaxScaler estimator对数值数据进行规范化。

 5、使用MinMaxScaler estimator对数值数据进行标准化。


三、实验原理

一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。

 Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。

 从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。

 下图描述了一个estimator及其输入和输出。


451154ace14d4b2da33437060b44718c.png

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


五、实验步骤

5.1 启动Spark集群和Zeppelin服务器。

在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh
3.  $ zeppelin-daemon.sh start

然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。

 2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:

24a0b5cc2b754515928476a669b37891.png


5.2 使用IDF estimator,计算每个单词的重要性。

IDF estimator是用于处理文本的常用的estimators之一。它的名字是inverse document frequency(反转文档频率)的缩写。这个estimator经常在文本被分词和术语频率被计算之后立即使用。这个estimator背后的思想是通过计算它出现的文档数量来计算每个单词的重要性或权重。

 在zeppelin中输入以下代码:

1.  // 使用IDF estimator来计算每个单词的权重
2.  import org.apache.spark.ml.feature.Tokenizer
3.  import org.apache.spark.ml.feature.HashingTF
4.  import org.apache.spark.ml.feature.IDF
5.       
6.  // 构造一个DataFrame,代表一个文档
7.  val text_data = spark.createDataFrame(Seq(
8.                              (1, "Spark is a unified data analytics engine"),
9.                              (2, "Spark is cool and it is fun to work with Spark"),
10.                             (3, "There is a lot of exciting sessions at upcoming Spark summit"),
11.                             (4, "mllib transformer estimator evaluator and pipelines") )
12.                 ).toDF("id", "line")
13.      
14. // 分析转换器
15. val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")
16. val tkResult = tokenizer.transform(text_data)
17.      
18. // HashingTF转换器
19. val tf = new HashingTF().setInputCol("words").setOutputCol("wordFreqVect").setNumFeatures(4096)
20. val tfResult = tf.transform(tkResult )    // Tokenizer transformer的输出列是HashingTF的输入
21.      
22. // IDF estimator
23. // HashingTF转换器的输出是IDF estimator的输入
24. val idf = new IDF().setInputCol("wordFreqVect").setOutputCol("features") 
25.      
26. // 因为IDF是一个estimator,所以调用fit函数, 得到一个学习过的模型
27. val idfModel = idf.fit(tfResult)
28.      
29. // 返回对象是一个模型(Model), 它是Transformer类型
30. val weightedWords = idfModel.transform(tfResult)
31. // weightedWords.select("label", "features").show(false)
32.  weightedWords.select("features").show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


9422df9821f543618e295bd1523574ea.png

5.3 使用StringIndexer estimator来对电影类型进行编码。

StringIndexer estimator是一个知道什么时候处理包含分类值的文本数据的estimator。它将一个分类值编码成一个基于其频率的索引,这样最频繁的分类值就会得到0的索引值,以此类推。

 在zeppelin中输入以下代码:

1.  // 使用StringIndexer estimator来对电影类型进行编码
2.  import org.apache.spark.ml.feature.StringIndexer
3.       
4.  // 构造一个DataFrame
5.  val movie_data = spark.createDataFrame(
6.                              Seq((1, "Comedy"),
7.                                  (2, "Action"),
8.                                  (3, "Comedy"),
9.                                  (4, "Horror"),
10.                                 (5, "Action"),
11.                                 (6, "Comedy"))
12.                         ).toDF("id", "genre")
13.      
14. // StringIndexer estimator
15. val movieIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIdx")
16.      
17. // 首先拟合数据
18. val movieIndexModel = movieIndexer.fit(movie_data)
19.      
20. // 使用返回的transformer来转换该数据
21. val indexedMovie = movieIndexModel.transform(movie_data)
22.      
23. // 查看结果
24. indexedMovie.orderBy("genreIdx").show()

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

5f3604cb134c4a88b1dfc31e945307a4.png


5.4 使用OneHotEncoderEstimator estimator将分类值的索引编码为二元向量。

OneHotEncoderEstimator estimator是另一种有用的分类值的estimator,它将分类值的索引编码为二元向量。这个estimator经常与StringIndexer estimator一起使用,其中StringIndexer的输出成为这个estimator的输入。

 在zeppelin中输入以下代码:


1.  // OneHotEncoderEstimator estimator消费StringIndexer estimator的输出
2.  import org.apache.spark.ml.feature.OneHotEncoderEstimator
3.       
4.  // 输入列genreIdx是之前示例中StringIndex的输出列
5.  val oneHotEncoderEst = new OneHotEncoderEstimator().setInputCols(Array("genreIdx"))
6.  .setOutputCols(Array("genreIdxVector"))
7.       
8.  // 指使indexedMovie数据(在上一个示例中产生的)
9.  val oneHotEncoderModel = oneHotEncoderEst.fit(indexedMovie)
10. val oneHotEncoderVect = oneHotEncoderModel.transform(indexedMovie)
11.      
12. // 显示
13. oneHotEncoderVect .orderBy("genre").show()

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

f1e8d7679d324248bcd618b6207882ec.png


5.5 使用MinMaxScaler estimator对数值数据进行规范化。

规范化数值数据是将其原始范围映射到从0到1的范围的过程。当observations有多个不同范围的属性时,这一点特别有用。

 在zeppelin中输入以下代码:

1.  // 使用MinMaxScaler estimator来重新调节特征
2.  import org.apache.spark.ml.feature.MinMaxScaler
3.  import org.apache.spark.ml.linalg.Vectors
4.       
5.  // 构造DataFrame
6.  val employee_data = spark.createDataFrame(
7.                              Seq((1, Vectors.dense(125400, 5.3)),
8.                                  (2, Vectors.dense(179100, 6.9)),
9.                                  (3, Vectors.dense(154770, 5.2)),
10.                                 (4, Vectors.dense(199650, 4.11)))
11.                         ).toDF("empId", "features")
12.      
13. // MinMaxScaler estimator
14. val minMaxScaler = new MinMaxScaler().setMin(0.0)
15.                                      .setMax(5.0)
16.                                      .setInputCol("features")
17.                                      .setOutputCol("scaledFeatures")
18.      
19. // 拟合数据,建立模型
20. val scalerModel = minMaxScaler.fit(employee_data)
21.      
22. // 使用学习到的模型对数据集进行转换
23. val scaledData = scalerModel.transform(employee_data)
24.      
25. // 输出特征缩放到的范围
26. println(s"特征缩放到的范围: [${minMaxScaler.getMin},${minMaxScaler.getMax}]")
27.      
28. // 显示结果
29. scaledData.select("features", "scaledFeatures").show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


23d87d3750b641439c7769c6bd789b1a.png

5.6 使用MinMaxScaler estimator对数值数据进行标准化。

除了数值数据规范化之外,另一个经常用于处理数值数据的操作称为标准化(standardization.)。当数值数据有一个接近与钟形曲线关闭的分布,这个操作尤其适用。标准化操作可以帮助将数据转换为标准化形式,其中数据的范围为-1和1,平均值为0。

 在zeppelin中输入以下代码:

1.  // 使用StandardScaler estimator标准化围绕均值0的特征
2.  import org.apache.spark.ml.feature.StandardScaler
3.  import org.apache.spark.ml.linalg.Vectors
4.       
5.  // 构造DataFrame
6.  val employee_data = spark.createDataFrame(Seq(
7.                              (1, Vectors.dense(125400, 5.3)),
8.                              (2, Vectors.dense(179100, 6.9)),
9.                              (3, Vectors.dense(154770, 5.2)),
10.                             (4, Vectors.dense(199650, 4.11)))
11.                         ).toDF("empId", "features")
12.      
13. // 将单位标准偏差设置为true并围绕平均值
14. val standardScaler = new StandardScaler().setWithStd(true)
15.                                          .setWithMean(true)
16.                                          .setInputCol("features")
17.                                          .setOutputCol("scaledFeatures")
18.      
19. // 拟合数据,建立模型
20. val standardMode = standardScaler.fit(employee_data)
21.      
22. // 使用学习到的模型对数据集进行转换
23. val standardData = standardMode.transform(employee_data)
24.      
25. // 显示结果
26. standardData.show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


39c5143dab314d318f197889847263f4.png


结束语

一个Estimator代表了一种机器学习算法,用来在训练数据集上训练或拟合机器学习模型。它实现了一个名为fit的方法,该方法接受一个DataFrame作为参数并返回一个机器学习模型。

 Estimator所代表的算法可分为两类,一类是用于机器学习的算法,另一类是用于进行数据转换的算法。例如,称为LinearRegression的ML算法就属于第一种类型,它的fit方法返回一个LinearRegressionModel类的实例。它用于诸如预测房价等回归任务。而StringIndexer就属于第二种类型,它用来将一列的分类值编码成索引,这样每个分类值的索引值都是基于它出现在DataFrame的整个输入列中的频率。

 从技术的角度来看,一个estimator有一个名为fit的函数,它在输入列上应用一个算法,结果被封装在一个叫做Model的对象类型中,它是一个Transformer类型。输入列和输出列名称可以在estimator的构造过程中指定。


相关文章
|
1月前
|
机器学习/深度学习
【机器学习】解决机器学习中OneVsRestClassifier的网格调参Invalid parameter max_depth for estimator OneVsRestClassifier
文章介绍了如何使用XGBClassifier和OneVsRestClassifier进行网格搜索调参,以找到最佳的模型参数。
26 5
|
3月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
28 0
|
4月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
4月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
|
4月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
45 2
|
4月前
|
机器学习/深度学习 分布式计算 算法
Spark MLlib简介与机器学习流程
Spark MLlib简介与机器学习流程
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
106 1
Spark快速大数据分析PDF下载读书分享推荐
|
分布式计算 大数据 Spark
【Spark Summit East 2017】基于Spark ML和GraphFrames的大规模文本分析管道
本讲义出自Alexey Svyatkovskiy在Spark Summit East 2017上的演讲,主要介绍了基于Spark ML和GraphFrames的大规模文本分析管道的实现,并介绍了用于的描绘直方图、计算描述性统计的跨平台的Scala数据聚合基元——Histogrammar package,并分享了非结构化数据处理、高效访问的数据存储格式以及大规模图处理等问题。
2126 0
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
118 3
|
11天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
24 3