Spark机器学习管道 - Transformer

简介: Spark机器学习管道 - Transformer

一、实验目的

掌握Spark机器学习管道中常用Transformer的使用。

二、实验内容

1、应用Binarizer transformer,将连续值变量转换为两个离散的值。

 2、使用Bucketizer transformer将温度列放入三个桶中,输出按温度列排序。

 3、使用OneHotEncoder transformer将序数值转换为分类值。

 4、使用Tokenizer Transformer执行分词任务。

 5、使用StopWordsRemover transformer来删除英语停止词。

 6、使用HashingTF transformer将单词转换为数字表示。

 7、使用VectorAssembler transformer来组合特征到一个特征向量。


三、实验原理

Transformer是一个算法,用来将一个输入DataFrame转换到另一个DataFrame,并向其添加一个或多个特征。Transformers的设计目的是通过在特征工程步骤和模型评估步骤中操作一个或多个列来转换DataFrame中的数据。

 Spark ML提供两种类型的transformers:特征transformer和机器学习模型。

 一个feature transformer通过对输入数据集中的一个列应用转换来创建一个或多个新列,并返回一个附加了新列的新DataFrame。例如,如果输入数据集有一个包含句子的列,则可以使用feature transformer将句子拆分为单词,并创建一个新的列,该列将单词存储在数组中。

 一个模型代表一个机器学习模型。它接受一个DataFrame作为输入,并输出带有每个输入特征向量的预测标签的新DataFrame。输入数据集必须具有包含特征向量的列。模型读取包含特征向量的列,为每个特征向量预测一个标签,并返回一个新的DataFrame,其中预测标签作为新列附加。

 下图描述了transformer的工作模式,DF1中的阴影部分表示输入列,DF2的阴影部分表示输出列。

461bf2aac7f64714a06e1568cb75a52c.png


四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


五、实验步骤

5.1 启动Spark集群和Zeppelin服务器

在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh
3.  $ zeppelin-daemon.sh start

然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。

 2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:

0d7a6ea16088446db8d64882e9b0740e.png


5.2 使用Binarizer transformer将temperature列值转换到两个桶

Binarizer transformer(二元转换器)简单地将输入列的值转换为两组。第一组包含小于或等于指定阈值的值,输出列中的值将为零。对于其他值,输出列中的值将是1。输入列必须是double类型或VectorUDT。

 在zeppelin中输入以下代码:

1.  // 使用Binarizer transformer将温度值转换到两个桶
2.  import org.apache.spark.ml.feature.Binarizer
3.       
4.  // 构造原始数据集(DataFrame)
5.  val arrival_data = spark.createDataFrame(
6.          Seq(("SFO", "B737", 18, 95.1, "late"),
7.              ("SEA", "A319", 5, 65.7, "ontime"),
8.              ("LAX", "B747", 15, 31.5, "late"),
9.              ("ATL", "A319", 14, 40.5, "late") ))
10.         .toDF("origin", "model", "hour", "temperature", "arrival")
11.      
12. // Transformer(Binarizer transformer) - 这是一个二元分类转换器
13. val binarizer = new Binarizer().setInputCol("temperature").setOutputCol("freezing").setThreshold(35.6)
14. binarizer.transform(arrival_data).show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

3c093391766f414c9b0ef5ff6d94c867.png


5.3 使用Bucketizer transformer将温度列放入三个桶中,输出按温度列排序。

使用 Bucketizer transformer将连续值变量转换为所需的一组离散的值。Bucketizer transformer是Binarizer的通用版本。

 在zeppelin中输入以下代码:

1.  // 使用Bucketizer transformer将温度值转换到三个桶
2.  import org.apache.spark.ml.feature.Bucketizer
3.       
4.  val bucketBorders = Array(-1.0, 32.0, 70.0, 150.0)
5.       
6.  val bucketer = new Bucketizer().setInputCol("temperature").setOutputCol("intensity").setSplits(bucketBorders)
7.  val output = bucketer.transform(arrival_data)
8.  output.show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


055bc7d97e02460594975894f524dc29.png

显示转换结果,按温度排序。在zeppelin中输入以下代码:

1.  output.select("temperature", "intensity").orderBy("temperature").show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

0dba18c75520471284c24357871de8f9.png


5.4 使用OneHotEncoder transformer将序数值转换为分类值。

OneHotEncoder transformer在处理数值分类值时通常使用。OneHotEncoder transformer将一个数字的分类值映射到一个二元向量(binary vector)中,以有意地删除数字分类值的隐式排序。

 在zeppelin中输入以下代码:

1.  // 使用OneHotEncoder transformer将序数值转换为分类值
2.  import org.apache.spark.ml.feature.OneHotEncoder
3.       
4.  val student_major_data = spark.createDataFrame(
5.                  Seq( ("John", "Math", 3),
6.                       ("Mary", "Engineering", 2),
7.                       ("Jeff", "Philosophy", 7),
8.                       ("Jane", "Math", 3),
9.                       ("Lyna", "Nursing", 4) )).toDF("user", "major","majorIdx")
10.      
11. val oneHotEncoder = new OneHotEncoder().setInputCol("majorIdx").setOutputCol("majorVect")
12. oneHotEncoder.transform(student_major_data).show()

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


3e0beaba55434402a9feacc326adc902.png

5.5 使用Tokenizer Transformer执行分词任务。

Tokenizer transformer对由空格分隔的字符串进行分词,并返回一个单词数组。

 在zeppelin中输入以下代码:

1.  // 使用Tokenizer Transformer执行分词
2.  import org.apache.spark.ml.feature.Tokenizer
3.  import org.apache.spark.sql.functions._
4.       
5.  val text_data = spark.createDataFrame(
6.              Seq((1, "Spark is a unified data analytics engine"),
7.                  (2, "It is fun to work with Spark"),
8.                  (3, "There is a lot of exciting sessions at upcoming Spark summit"),
9.                  (4, "mllib transformer estimator evaluator and pipelines") )
10.             ).toDF("id", "line")
11.      
12. val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")
13. val tokenized = tokenizer.transform(text_data)
14.      
15. tokenized.select("words").withColumn("tokens", size(col("words"))).show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


ec602c38bf134034a351b13769263b40.png

5.6 使用StopWordsRemover transformer来删除英语停止词。

停止词(stop words)是一种语言中常用的词。在自然语言处理或机器学习的背景下,停止词往往会增加不必要的噪音,而不是提供任何有意义的贡献。因此,在分词步骤之后立即执行停止词删除步骤是很常见的。StopWordsRemover transformer是设计用来实现这个目的的。

 在zeppelin中输入以下代码:


1.  // 使用StopWordsRemover transformer来删除Tokenization示例中的单词中的英语停止词
2.  import org.apache.spark.ml.feature.StopWordsRemover
3.       
4.  // 加载英语中的停止词
5.  val enStopWords = StopWordsRemover.loadDefaultStopWords("english")
6.  // 转换:过滤停止词
7.  val remover = new StopWordsRemover().setStopWords(enStopWords).setInputCol("words").setOutputCol("filtered")
8.       
9.  // 来自上一示例中的tokenized
10. val cleanedTokens = remover.transform(tokenized)
11. cleanedTokens.select("words","filtered").show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

ca8f840b4e694c8e80221d180bb720e9.png


5.7 使用HashingTF transformer将单词转换为数字表示。

HashingTF transformer用于通过计算每行中每个单词的频率来将单词转换成数字表示。下面的例子将从上一示例中提供过滤后的列到HashingTF transformer。

 在zeppelin中输入以下代码:

1.  // 使用HashingTF transformer将单词转换为数字表示,通过哈希和统计
2.  import org.apache.spark.ml.feature.HashingTF
3.       
4.  val tf = new HashingTF().setInputCol("filtered").setOutputCol("TFOut").setNumFeatures(4096)
5.  val tfResult = tf.transform(cleanedTokens)
6.  tfResult.select("filtered", "TFOut").show(false)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

8aea43e1636647e8b01a105db57f6e9f.png


我们通过下面的代码来理解HashingTF的转换过程:

1.  // 有一个单词数组,计算该数组中单词的频率向量
2.  val words = Array("the", "dog", "jumped", "over", "the")
3.       
4.  // 第一步是计算数组中每个元素的哈希码。可以使用内置的##方法来计算任何对象的哈希码
5.  val hashCodes = words.map { _.## }
6.       
7.  // 为了将哈希码转换为有效的向量索引,我们取每个哈希的模(哈希值除以向量的大小,本例中为16):
8.  val indices = hashCodes.map { code => Math.abs(code % 16) }
9.       
10. // 然后我们可以创建一个从索引到该索引出现次数的映射:
11. val indexFrequency = indices.groupBy(identity).mapValues {_.size.toDouble}
12.      
13. // 最后,我们可以把这个映射转换成一个稀疏向量,其中向量中每个元素的值就是这个特定索引出现的频率:
14. import org.apache.spark.ml.linalg._
15. val termFrequencies = Vectors.sparse(16, indexFrequency.toSeq)

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:


1bd11664f172443da75a7402e5bdb751.png

5.8 使用VectorAssembler transformer来组合特征到一个特征向量。

VectorAssembler transformer简单地将一组列合并成一个vector列。在机器学习术语中,这相当于将单个特征组合成单向量特征,以供ML算法学习。单个输入列的类型必须是下列类型之一:数值、布尔值或向量。输出向量列包含以指定顺序的所有列的值。这个transformer实际上几乎在每一个ML管道中都被使用,它的输出将被传递到一个estimator中。

 在zeppelin中输入以下代码:

1.  // 使用VectorAssembler transformer来组合特征到一个Vector特征
2.  import org.apache.spark.ml.feature.VectorAssembler
3.       
4.  val arrival_features = spark.createDataFrame(
5.                  Seq((18, 95.1, true),
6.                      (5, 65.7, true), 
7.                      (15, 31.5,false),
8.                      (14, 40.5, false))
9.                  ).toDF("hour", "temperature","on_time")
10.      
11. val assembler = new VectorAssembler().setInputCols(Array("hour","temperature", "on_time"))
12. .setOutputCol("features")
13. val output = assembler.transform(arrival_features)
14. output.show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

6506273e48a845d78053dbbcf72d4159.png


结束语

Transformer是一个算法,用来将一个输入DataFrame转换到另一个DataFrame,并向其添加一个或多个特征。Transformers的设计目的是通过在特征工程步骤和模型评估步骤中操作一个或多个列来转换DataFrame中的数据。转换过程是在构建特征的上下文中,这些特征将被ML算法用于学习。这个过程通常涉及添加或删除列(特征),将列值从文本转换为数值,或者规范化某一列的值。

 在MLlib中使用ML算法是有严格要求的;它们要求所有的特征都是Double数据类型,包括标签。

 Spark ML提供两种类型的transformers:特征transformer和机器学习模型。


相关文章
|
7月前
|
机器学习/深度学习 自然语言处理 并行计算
【机器学习】Transformer:自然语言处理的巅峰之作
【机器学习】Transformer:自然语言处理的巅峰之作
236 0
|
6月前
|
机器学习/深度学习 人工智能 自然语言处理
【机器学习】Transformer模型大小与性能探究
【机器学习】Transformer模型大小与性能探究
390 5
|
7月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
150 6
|
7月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
125 6
|
5月前
|
机器学习/深度学习 自然语言处理 PyTorch
【机器学习】自然语言处理(NLP)领域革命性突破的模型——Transformer
【机器学习】自然语言处理(NLP)领域革命性突破的模型——Transformer
|
6月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
48 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
91 2
|
分布式计算 大数据 Spark
【Spark Summit East 2017】基于Spark ML和GraphFrames的大规模文本分析管道
本讲义出自Alexey Svyatkovskiy在Spark Summit East 2017上的演讲,主要介绍了基于Spark ML和GraphFrames的大规模文本分析管道的实现,并介绍了用于的描绘直方图、计算描述性统计的跨平台的Scala数据聚合基元——Histogrammar package,并分享了非结构化数据处理、高效访问的数据存储格式以及大规模图处理等问题。
2139 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0