一、实验目的
掌握Spark机器学习管道中常用Transformer的使用。
二、实验内容
1、应用Binarizer transformer,将连续值变量转换为两个离散的值。
2、使用Bucketizer transformer将温度列放入三个桶中,输出按温度列排序。
3、使用OneHotEncoder transformer将序数值转换为分类值。
4、使用Tokenizer Transformer执行分词任务。
5、使用StopWordsRemover transformer来删除英语停止词。
6、使用HashingTF transformer将单词转换为数字表示。
7、使用VectorAssembler transformer来组合特征到一个特征向量。
三、实验原理
Transformer是一个算法,用来将一个输入DataFrame转换到另一个DataFrame,并向其添加一个或多个特征。Transformers的设计目的是通过在特征工程步骤和模型评估步骤中操作一个或多个列来转换DataFrame中的数据。
Spark ML提供两种类型的transformers:特征transformer和机器学习模型。
一个feature transformer通过对输入数据集中的一个列应用转换来创建一个或多个新列,并返回一个附加了新列的新DataFrame。例如,如果输入数据集有一个包含句子的列,则可以使用feature transformer将句子拆分为单词,并创建一个新的列,该列将单词存储在数组中。
一个模型代表一个机器学习模型。它接受一个DataFrame作为输入,并输出带有每个输入特征向量的预测标签的新DataFrame。输入数据集必须具有包含特征向量的列。模型读取包含特征向量的列,为每个特征向量预测一个标签,并返回一个新的DataFrame,其中预测标签作为新列附加。
下图描述了transformer的工作模式,DF1中的阴影部分表示输入列,DF2的阴影部分表示输出列。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
五、实验步骤
5.1 启动Spark集群和Zeppelin服务器
在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh 3. $ zeppelin-daemon.sh start
然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。
2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:
5.2 使用Binarizer transformer将temperature列值转换到两个桶
Binarizer transformer(二元转换器)简单地将输入列的值转换为两组。第一组包含小于或等于指定阈值的值,输出列中的值将为零。对于其他值,输出列中的值将是1。输入列必须是double类型或VectorUDT。
在zeppelin中输入以下代码:
1. // 使用Binarizer transformer将温度值转换到两个桶 2. import org.apache.spark.ml.feature.Binarizer 3. 4. // 构造原始数据集(DataFrame) 5. val arrival_data = spark.createDataFrame( 6. Seq(("SFO", "B737", 18, 95.1, "late"), 7. ("SEA", "A319", 5, 65.7, "ontime"), 8. ("LAX", "B747", 15, 31.5, "late"), 9. ("ATL", "A319", 14, 40.5, "late") )) 10. .toDF("origin", "model", "hour", "temperature", "arrival") 11. 12. // Transformer(Binarizer transformer) - 这是一个二元分类转换器 13. val binarizer = new Binarizer().setInputCol("temperature").setOutputCol("freezing").setThreshold(35.6) 14. binarizer.transform(arrival_data).show
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.3 使用Bucketizer transformer将温度列放入三个桶中,输出按温度列排序。
使用 Bucketizer transformer将连续值变量转换为所需的一组离散的值。Bucketizer transformer是Binarizer的通用版本。
在zeppelin中输入以下代码:
1. // 使用Bucketizer transformer将温度值转换到三个桶 2. import org.apache.spark.ml.feature.Bucketizer 3. 4. val bucketBorders = Array(-1.0, 32.0, 70.0, 150.0) 5. 6. val bucketer = new Bucketizer().setInputCol("temperature").setOutputCol("intensity").setSplits(bucketBorders) 7. val output = bucketer.transform(arrival_data) 8. output.show
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
显示转换结果,按温度排序。在zeppelin中输入以下代码:
1. output.select("temperature", "intensity").orderBy("temperature").show
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.4 使用OneHotEncoder transformer将序数值转换为分类值。
OneHotEncoder transformer在处理数值分类值时通常使用。OneHotEncoder transformer将一个数字的分类值映射到一个二元向量(binary vector)中,以有意地删除数字分类值的隐式排序。
在zeppelin中输入以下代码:
1. // 使用OneHotEncoder transformer将序数值转换为分类值 2. import org.apache.spark.ml.feature.OneHotEncoder 3. 4. val student_major_data = spark.createDataFrame( 5. Seq( ("John", "Math", 3), 6. ("Mary", "Engineering", 2), 7. ("Jeff", "Philosophy", 7), 8. ("Jane", "Math", 3), 9. ("Lyna", "Nursing", 4) )).toDF("user", "major","majorIdx") 10. 11. val oneHotEncoder = new OneHotEncoder().setInputCol("majorIdx").setOutputCol("majorVect") 12. oneHotEncoder.transform(student_major_data).show()
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.5 使用Tokenizer Transformer执行分词任务。
Tokenizer transformer对由空格分隔的字符串进行分词,并返回一个单词数组。
在zeppelin中输入以下代码:
1. // 使用Tokenizer Transformer执行分词 2. import org.apache.spark.ml.feature.Tokenizer 3. import org.apache.spark.sql.functions._ 4. 5. val text_data = spark.createDataFrame( 6. Seq((1, "Spark is a unified data analytics engine"), 7. (2, "It is fun to work with Spark"), 8. (3, "There is a lot of exciting sessions at upcoming Spark summit"), 9. (4, "mllib transformer estimator evaluator and pipelines") ) 10. ).toDF("id", "line") 11. 12. val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words") 13. val tokenized = tokenizer.transform(text_data) 14. 15. tokenized.select("words").withColumn("tokens", size(col("words"))).show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.6 使用StopWordsRemover transformer来删除英语停止词。
停止词(stop words)是一种语言中常用的词。在自然语言处理或机器学习的背景下,停止词往往会增加不必要的噪音,而不是提供任何有意义的贡献。因此,在分词步骤之后立即执行停止词删除步骤是很常见的。StopWordsRemover transformer是设计用来实现这个目的的。
在zeppelin中输入以下代码:
1. // 使用StopWordsRemover transformer来删除Tokenization示例中的单词中的英语停止词 2. import org.apache.spark.ml.feature.StopWordsRemover 3. 4. // 加载英语中的停止词 5. val enStopWords = StopWordsRemover.loadDefaultStopWords("english") 6. // 转换:过滤停止词 7. val remover = new StopWordsRemover().setStopWords(enStopWords).setInputCol("words").setOutputCol("filtered") 8. 9. // 来自上一示例中的tokenized 10. val cleanedTokens = remover.transform(tokenized) 11. cleanedTokens.select("words","filtered").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.7 使用HashingTF transformer将单词转换为数字表示。
HashingTF transformer用于通过计算每行中每个单词的频率来将单词转换成数字表示。下面的例子将从上一示例中提供过滤后的列到HashingTF transformer。
在zeppelin中输入以下代码:
1. // 使用HashingTF transformer将单词转换为数字表示,通过哈希和统计 2. import org.apache.spark.ml.feature.HashingTF 3. 4. val tf = new HashingTF().setInputCol("filtered").setOutputCol("TFOut").setNumFeatures(4096) 5. val tfResult = tf.transform(cleanedTokens) 6. tfResult.select("filtered", "TFOut").show(false)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
我们通过下面的代码来理解HashingTF的转换过程:
1. // 有一个单词数组,计算该数组中单词的频率向量 2. val words = Array("the", "dog", "jumped", "over", "the") 3. 4. // 第一步是计算数组中每个元素的哈希码。可以使用内置的##方法来计算任何对象的哈希码 5. val hashCodes = words.map { _.## } 6. 7. // 为了将哈希码转换为有效的向量索引,我们取每个哈希的模(哈希值除以向量的大小,本例中为16): 8. val indices = hashCodes.map { code => Math.abs(code % 16) } 9. 10. // 然后我们可以创建一个从索引到该索引出现次数的映射: 11. val indexFrequency = indices.groupBy(identity).mapValues {_.size.toDouble} 12. 13. // 最后,我们可以把这个映射转换成一个稀疏向量,其中向量中每个元素的值就是这个特定索引出现的频率: 14. import org.apache.spark.ml.linalg._ 15. val termFrequencies = Vectors.sparse(16, indexFrequency.toSeq)
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
5.8 使用VectorAssembler transformer来组合特征到一个特征向量。
VectorAssembler transformer简单地将一组列合并成一个vector列。在机器学习术语中,这相当于将单个特征组合成单向量特征,以供ML算法学习。单个输入列的类型必须是下列类型之一:数值、布尔值或向量。输出向量列包含以指定顺序的所有列的值。这个transformer实际上几乎在每一个ML管道中都被使用,它的输出将被传递到一个estimator中。
在zeppelin中输入以下代码:
1. // 使用VectorAssembler transformer来组合特征到一个Vector特征 2. import org.apache.spark.ml.feature.VectorAssembler 3. 4. val arrival_features = spark.createDataFrame( 5. Seq((18, 95.1, true), 6. (5, 65.7, true), 7. (15, 31.5,false), 8. (14, 40.5, false)) 9. ).toDF("hour", "temperature","on_time") 10. 11. val assembler = new VectorAssembler().setInputCols(Array("hour","temperature", "on_time")) 12. .setOutputCol("features") 13. val output = assembler.transform(arrival_features) 14. output.show
同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:
结束语
Transformer是一个算法,用来将一个输入DataFrame转换到另一个DataFrame,并向其添加一个或多个特征。Transformers的设计目的是通过在特征工程步骤和模型评估步骤中操作一个或多个列来转换DataFrame中的数据。转换过程是在构建特征的上下文中,这些特征将被ML算法用于学习。这个过程通常涉及添加或删除列(特征),将列值从文本转换为数值,或者规范化某一列的值。
在MLlib中使用ML算法是有严格要求的;它们要求所有的特征都是Double数据类型,包括标签。
Spark ML提供两种类型的transformers:特征transformer和机器学习模型。