Spark机器学习管道 - Pipeline

简介: Spark机器学习管道 - Pipeline

一、实验目的

掌握使用Spark机器学习管道创建小型机器学习工作流

二、实验内容

1、构建一个机器学习管道,应用LogisticRegression算法,预测一行文本中是否出现了”spark”这个单词。

三、实验原理

Spark ML有一个名为Pipeline的类,它被设计用来管理一系列的阶段,每一个阶段都由PipelineStage来表示。一个PipelineStage既可以是transformer,也可以是estimator。抽象Pipeline是一种estimator。管道以指定的顺序连接多个transformers和estimators,形成机器学习工作流。从概念上讲,它将机器学习工作流中的数据预处理、特征提取和模型训练步骤链接在一起。

 管道由一系列阶段组成,每个阶段都是一个Transformer或一个Estimator。它按照指定的顺序运行这些阶段。

 下图描述了一个使用管道创建一个小型工作流。

37d39efdc55146a79dd8728bb8dc604e.png


四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


五、实验步骤

5.1 启动Spark集群和Zeppelin服务器。

在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh
3.  $ zeppelin-daemon.sh start

然后使用jps命令查看启动的进程,确保Spark集群和Zeppelin服务器已经正确启动。

 2、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页,点击”Create new note”链接,创建一个新的笔记本。如下图所示:

231d3a59844346efa050975b7b5e6061.png


5.2 使用管道创建一个小型工作流。

这个示例中,管道由两个transformers和一个estimator组成。当调用Pipeline.fit()函数时,包含原始文本的输入DataFrame将被传递给Tokenizer transformer,其输出将被传递到HashingTF transformer,它将单词转换为特征。该Pipeline认识到LogisticRegression是一个estimator,因此它将调用fit函数和计算特征来产生一个LogisticRegressionModel。

 1、导入所需的包。在zeppelin中输入以下代码:

1.  import org.apache.spark.ml.{Pipeline, PipelineModel}
2.  import org.apache.spark.ml.classification.LogisticRegression
3.  import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

同时按下”【Shift + Enter】”键,执行以上代码。

 2、构造一个DataFrame。在zeppelin中输入以下代码:

1.  val text_data = spark.createDataFrame(Seq(
2.                          (1, "Spark is a unified data analytics engine", 0.0),
3.                          (2, "Spark is cool and it is fun to work with Spark", 0.0),
4.                          (3, "There is a lot of exciting sessions at upcoming Spark summit", 0.0),
5.                          (4, "signup to win a million dollars", 0.0) ) 
6.                      ).toDF("id", "line", "label")

同时按下”【Shift + Enter】”键,执行以上代码。

 3、构造第一个阶段transformer。在zeppelin中输入以下代码:

1.  val tokenizer = new Tokenizer().setInputCol("line").setOutputCol("words")

同时按下”【Shift + Enter】”键,执行以上代码。

 4、构造第二个阶段transformer(第一个阶段的输出作为第二个阶段的输入)。在zeppelin中输入以下代码:

1.  val hashingTF = new HashingTF().setInputCol(tokenizer.getOutputCol)
2.                                 .setOutputCol("features")
3.                                 .setNumFeatures(4096)

同时按下”【Shift + Enter】”键,执行以上代码。

 5、构造第三个阶段estimator,代表逻辑回归算法实现。在zeppelin中输入以下代码:

1.  val logisticReg = new LogisticRegression().setMaxIter(5).setRegParam(0.01)

同时按下”【Shift + Enter】”键,执行以上代码。

 6、构造一个管道,由以上三个阶段组成。在zeppelin中输入以下代码:

1.  val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, logisticReg))

同时按下”【Shift + Enter】”键,执行以上代码。

 7、触发各阶段的顺序执行。在zeppelin中输入以下代码:

1.  val logisticRegModel = pipeline.fit(text_data)

同时按下”【Shift + Enter】”键,执行以上代码。

 8、使用学习到的模型对数据进行转换。在zeppelin中输入以下代码:

1.  logisticRegModel.transform(text_data).show

同时按下”【Shift + Enter】”键,执行以上代码。输出结果如下:

52bdbbf8e4884debb5d90147d30b1062.png


结束语

Pipeline的fit方法调用每个Transformer的transform方法和每个Estimator的fit方法,与创建管道时指定的顺序相同。每个Transformer接受一个DataFrame作为输入,并返回一个新的DataFrame,它将成为管道中下一阶段的输入。如果一个阶段是一个Estimator,则调用它的fit方法来训练一个模型。返回的模型是一个Transformer,用于将前一阶段的输出转换为下一阶段的输入。管道本身也是一个Estimator,其fit方法返回一个PipelineModel,它是一个Transformer。


相关文章
|
5月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
5月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
|
4月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
35 0
|
5月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
58 2
|
5月前
|
机器学习/深度学习 分布式计算 算法
Spark MLlib简介与机器学习流程
Spark MLlib简介与机器学习流程
|
分布式计算 大数据 Spark
【Spark Summit East 2017】基于Spark ML和GraphFrames的大规模文本分析管道
本讲义出自Alexey Svyatkovskiy在Spark Summit East 2017上的演讲,主要介绍了基于Spark ML和GraphFrames的大规模文本分析管道的实现,并介绍了用于的描绘直方图、计算描述性统计的跨平台的Scala数据聚合基元——Histogrammar package,并分享了非结构化数据处理、高效访问的数据存储格式以及大规模图处理等问题。
2132 0
|
13天前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
36 0
|
13天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
29 0
|
13天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
43 0
|
2天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
6 1