大数据 | Spark机器学习工作流开发指南

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流,它在原来的MLlib的基础上进行了大量的改进和优化,让Spark生态更见坚不可摧,本文就来详细介绍一下Spark机器学习工作流的基本概念和用法。

前言

之前我曾写过一篇介绍Spark集群搭建和基本使用的文章,在文中详细的介绍了Spark的来历、优势及搭建过程,Spark以其低时延、速度快、通用性强等优势在大数据处理领域备受欢迎,但是它的强大之处绝不仅仅是因为强大的分布式计算能力。如果你仅仅想借助Spark的并行计算能力,那样我觉得大可不必费尽周折去搭建Spark和HDFS,可以直接借助多线程实现数据的并行处理和读取。

Spark之所以如此强大还得益于它完善的生态系统,它在仓储工具、机器学习等领域也非常完备,这让它能够进一步在大数据上进行一些查找、训练等工作,而不仅仅是作为一个数据读取工具使用。

在Spark 1.2之前它携带的机器学习库是MLlib,它包含了聚类、逻辑回归、SVM等常用的机器学习算法,因此,它也得到了广泛的应用。但是,在机器学习工作流、数据格式、性能方面有所限制,因此,在Spark 1.2之后加入了Spark.ml机器学习库,这是在MLlib之上更高阶的API,它不仅面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API。另外,比较重要的一点是,它引入了工作流(Pipeline)的概念,将多个单独的算法组合成一个统一的流水线形式,便于实现复杂的机器学习模型,本文就来详细介绍一下Spark.ml机器学习工作流的一些专业术语和基本使用方法。

Spark.ml简介

5.jpg

Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流(machine learning pipelines)。

一个完整的机器学习工作流包括多个阶段,

  • 数据预处理
  • 特征抽取
  • 模型训练
  • 交叉验证
  • ......

spark.ml对机器学习算法的API进行了标准化,使得更加容易的将多个算法组合成单个工作流。

在spark.ml中引入了一些关键的专业术语,它们分别是,

ML Dataset:spark.ml不像MLlib那样使用RDD作为数据输入,而是使用更加高阶的DataFrame,它相对于RDD包含了schema信息,它可以支持多种不同的数据类型,例如,文本、特征向量、真实标签、预测标签。

Transformer:它可以被翻译为转换器,是一个工作流阶段(PipelineStage),它的作用是将一个DataFrame转换成另一个DataFrame,从技术上讲Transformer实现了transform()方法。转换器是一种抽象,它包括特征转换器和机器学习模型。

Estimator:它可以被译为评估器,作用是将输入的DataFrame转化成Transformer,在技术上它实现了fit()方法。例如,一个机器学习算法就是Estimator,它的输入为数据集,输出是机器学习模型,这个机器学习模型就是一个Transformer,它能够将一个数据转换成另一个数据。

Pipeline:可以翻译为工作流或者管道,个人认为反义词工作流更加贴切、容易理解。它把多个工作流阶段连接起来,组成一个完整的工作流。例如,简单的文本文档处理工作流可能包括如下几个阶段,

  • 将每个文档的文本拆分为单词。
  • 将每个文档的单词转换成数字特征向量。
  • 使用特征向量和标签学习预测模型。

spark.ml将这样的工作流表示为Pipeline,它按特定的顺序运行一系列Pipeline(Transformer和Estimator)组成。

Param:Param被用来设置 Transformer 或者 Estimator 的参数。

如何运行?

工作流(Pipeline)是只由多个工作阶段组成的序列,每个阶段可以是一个Transformer或者Estimator,这些阶段按照顺序运行,并且输入的数据集在每个阶段时都会被修改。

在Transformer阶段,在数据集上调用transform()方法,在Estimator阶段,调用fit()方法以生成一个Transformer(它将成为PipelineModel或者拟合Pipeline的一部分), 并且在数据集上调用该Transformer的transform()方法。

下图是工作流训练时的使用情况,用来说明简单的文本文档处理工作流程,

6.png

在上图中,第一行代表了三个阶段的工作流,前两个(Tokenizer和HashingTF)是转换器(蓝色框),第三个阶段(Logistic Regression)是评估器(红色框)。

第二行代表了流经工作流的数据情况,其中圆柱体代表DataFrame。Tokenizer.transform()方法将原始文本文档(Raw text)拆分为单词,在数据集中添加带有单词的新列。HashingTF.transform()方法将words列转换为特征向量,并将带有这些向量的新列添加到数据集。然后,由于LogisticRegression是Estimator,因此管道首先调用LogisticRegression.fit()来生成LogisticRegressionModel。如果管道有更多阶段,则在将数据集传递到下一阶段之前,将在数据集中调用LogisticRegressionModel的transform()方法。

经过上述工作流,将产生一个PipelineModel,它是一个Transformer,这个模型在测试数据上使用的工作流如下,


7.png


在上图中,PipelineModel和原始Pipeline阶段数相同,但是原始Pipeline中的所有Estimator都已变为Transformer。在测试数据集上调用PipelineModel的transform()方法时,数据将按顺序传递给工作流。每个阶段的transform()方法都会更新数据集,并将其传递到下一个阶段。

Spark.ML实战

下面就以一个简单的实例来构造一个逻辑回归(LogisticRegression)工作流,来加深一下对上述流程的理解和印象。

在spark.ml实战中假设这样一个场景,一个句子(sentence)中,如果包含单词"flower",那么它的标签为1,否则为0。

导入模块

从前面讲解中我们可以清楚的知道,在构造这个工作流中,我们会用到Tokenizer、HashingTF、DataFrame、LogisticRegression、Pipeline,我们在程序开头先导入这些模块,

 
         

构造数据集

了解了示例背景,下面我们分别来构造训练集DataFrame和测试集DataFrame,

 
         

构建Pipeline

我们首先要来构造上面第一个图中的Pipeline(Estimator),在构建这个工作流时我们需要用到Tokenizer、HashingTF进行分词和抽取特征,然后调用逻辑回归算法,生成PipelineModel,

 
         

然后通过Pipeline把上述三个阶段连接起来,构成工作流,

 
         

可以看出,上面输出一个PipelineModel。

构建PipelineModel

下面要完成的就是上面第二幅图中的工作流,在测试集上面首先调用PipelineModel,然后通过多个阶段的数据传递,最终输出预测结果,

 
         

上述就是基本的Spark.ml应用实例。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
5月前
|
机器学习/深度学习 数据采集 人工智能
别让“大数据”变成“大忽悠”——聊聊机器学习的真本事
别让“大数据”变成“大忽悠”——聊聊机器学习的真本事
119 9
|
3月前
|
机器学习/深度学习 分布式计算 Java
Java 大视界 -- Java 大数据机器学习模型在遥感图像土地利用分类中的优化与应用(199)
本文探讨了Java大数据与机器学习模型在遥感图像土地利用分类中的优化与应用。面对传统方法效率低、精度差的问题,结合Hadoop、Spark与深度学习框架,实现了高效、精准的分类。通过实际案例展示了Java在数据处理、模型融合与参数调优中的强大能力,推动遥感图像分类迈向新高度。
|
3月前
|
机器学习/深度学习 存储 Java
Java 大视界 -- Java 大数据机器学习模型在游戏用户行为分析与游戏平衡优化中的应用(190)
本文探讨了Java大数据与机器学习模型在游戏用户行为分析及游戏平衡优化中的应用。通过数据采集、预处理与聚类分析,开发者可深入洞察玩家行为特征,构建个性化运营策略。同时,利用回归模型优化游戏数值与付费机制,提升游戏公平性与用户体验。
|
3月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据机器学习模型在舆情分析中的情感倾向判断与话题追踪(185)
本篇文章深入探讨了Java大数据与机器学习在舆情分析中的应用,重点介绍了情感倾向判断与话题追踪的技术实现。通过实际案例,展示了如何利用Java生态工具如Hadoop、Hive、Weka和Deeplearning4j进行舆情数据处理、情感分类与趋势预测,揭示了其在企业品牌管理与政府决策中的重要价值。文章还展望了多模态融合、实时性提升及个性化服务等未来发展方向。
|
机器学习/深度学习 数据采集 算法
Java 大视界 -- Java 大数据机器学习模型在金融衍生品定价中的创新方法与实践(166)
本文围绕 Java 大数据机器学习模型在金融衍生品定价中的应用展开,分析定价现状与挑战,阐述技术原理与应用,结合真实案例与代码给出实操方案,助力提升金融衍生品定价的准确性与效率。
Java 大视界 -- Java 大数据机器学习模型在金融衍生品定价中的创新方法与实践(166)
|
5月前
|
机器学习/深度学习 人工智能 算法
大数据与机器学习:数据驱动的智能时代
本文探讨了大数据与机器学习在数字化时代的融合及其深远影响。大数据作为“新时代的石油”,以其4V特性(体量、多样性、速度、真实性)为机器学习提供燃料,而机器学习通过监督、无监督、强化和深度学习等技术实现数据价值挖掘。两者协同效应显著,推动医疗、金融、零售、制造等行业创新。同时,文章分析了数据隐私、算法偏见、可解释性及能耗等挑战,并展望了边缘计算、联邦学习、AutoML等未来趋势。结语强调技术伦理与实际价值并重,倡导持续学习以把握智能时代机遇。
159 13
|
6月前
|
机器学习/深度学习 数据采集 算法
如何用大数据与机器学习挖掘瞪羚企业认定标准
本文探讨如何利用大数据与机器学习技术挖掘瞪羚企业认定标准。通过阿里云的大数据平台和政策宝资源整合能力,结合机器学习算法分析政策文本,提取关键信息,助力企业精准理解认定标准。文章对比了传统获取方式的局限性与新技术的优势,并以案例说明政策宝在申报中的作用,强调数据整合、模型选择及数据安全的重要性,为企业提供发展方向和政策支持。
|
机器学习/深度学习 自然语言处理 算法
【数据挖掘】金山办公2020校招大数据和机器学习算法笔试题
金山办公2020校招大数据和机器学习算法笔试题的解析,涵盖了编程、数据结构、正则表达式、机器学习等多个领域的题目和答案。
247 10
|
8月前
|
机器学习/深度学习 数据采集 分布式计算
大数据分析中的机器学习基础:从原理到实践
大数据分析中的机器学习基础:从原理到实践
361 3
|
8月前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
361 15

热门文章

最新文章

  • 1
    人工智能平台PAI产品使用合集之如何在odps上启动独立的任务
    185
  • 2
    DataWorks操作报错合集之出现报错“odps-0123055:用户脚本异常-Traceback(最后一次调用)”,如何解决
    431
  • 3
    人工智能平台PAI操作报错合集之在ODPS的xxx_dev项目空间调用easyrec训练,需要访问yyy项目空间的OSS,出现报错,是什么导致的
    139
  • 4
    MaxCompute操作报错合集之创建oss外部表时出现了报错:"Semantic analysis exception - external table checking failure, error message:,该怎么办
    413
  • 5
    MaxCompute操作报错合集之在本地用tunnel命令上传excel表格到mc遇到报错: tunnel upload C:\Users***\Desktop\a.xlsx mc里的非分区表名 -s false;该怎么办
    169
  • 6
    DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
    379
  • 7
    MaxCompute操作报错合集之通过UDF调用异常(其他使用http调用正常)。报错:java.lang.NoSuchMethodError:是什么导致的
    178
  • 8
    MaxCompute操作报错合集之查询外部表insert到内部表报错,两表字段一致,是什么原因
    165
  • 9
    MaxCompute操作报错合集之出现报错:invalid dynamic partition value: \ufffd\ufffd\ufffd\ufffd\ufffd\ufffd是什么原因
    369
  • 10
    MaxCompute产品使用合集之如何设置每次返回超过10000行记录
    192