大数据 | Spark机器学习工作流开发指南

简介: Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流,它在原来的MLlib的基础上进行了大量的改进和优化,让Spark生态更见坚不可摧,本文就来详细介绍一下Spark机器学习工作流的基本概念和用法。

前言

之前我曾写过一篇介绍Spark集群搭建和基本使用的文章,在文中详细的介绍了Spark的来历、优势及搭建过程,Spark以其低时延、速度快、通用性强等优势在大数据处理领域备受欢迎,但是它的强大之处绝不仅仅是因为强大的分布式计算能力。如果你仅仅想借助Spark的并行计算能力,那样我觉得大可不必费尽周折去搭建Spark和HDFS,可以直接借助多线程实现数据的并行处理和读取。

Spark之所以如此强大还得益于它完善的生态系统,它在仓储工具、机器学习等领域也非常完备,这让它能够进一步在大数据上进行一些查找、训练等工作,而不仅仅是作为一个数据读取工具使用。

在Spark 1.2之前它携带的机器学习库是MLlib,它包含了聚类、逻辑回归、SVM等常用的机器学习算法,因此,它也得到了广泛的应用。但是,在机器学习工作流、数据格式、性能方面有所限制,因此,在Spark 1.2之后加入了Spark.ml机器学习库,这是在MLlib之上更高阶的API,它不仅面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API。另外,比较重要的一点是,它引入了工作流(Pipeline)的概念,将多个单独的算法组合成一个统一的流水线形式,便于实现复杂的机器学习模型,本文就来详细介绍一下Spark.ml机器学习工作流的一些专业术语和基本使用方法。

Spark.ml简介

5.jpg

Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流(machine learning pipelines)。

一个完整的机器学习工作流包括多个阶段,

  • 数据预处理
  • 特征抽取
  • 模型训练
  • 交叉验证
  • ......

spark.ml对机器学习算法的API进行了标准化,使得更加容易的将多个算法组合成单个工作流。

在spark.ml中引入了一些关键的专业术语,它们分别是,

ML Dataset:spark.ml不像MLlib那样使用RDD作为数据输入,而是使用更加高阶的DataFrame,它相对于RDD包含了schema信息,它可以支持多种不同的数据类型,例如,文本、特征向量、真实标签、预测标签。

Transformer:它可以被翻译为转换器,是一个工作流阶段(PipelineStage),它的作用是将一个DataFrame转换成另一个DataFrame,从技术上讲Transformer实现了transform()方法。转换器是一种抽象,它包括特征转换器和机器学习模型。

Estimator:它可以被译为评估器,作用是将输入的DataFrame转化成Transformer,在技术上它实现了fit()方法。例如,一个机器学习算法就是Estimator,它的输入为数据集,输出是机器学习模型,这个机器学习模型就是一个Transformer,它能够将一个数据转换成另一个数据。

Pipeline:可以翻译为工作流或者管道,个人认为反义词工作流更加贴切、容易理解。它把多个工作流阶段连接起来,组成一个完整的工作流。例如,简单的文本文档处理工作流可能包括如下几个阶段,

  • 将每个文档的文本拆分为单词。
  • 将每个文档的单词转换成数字特征向量。
  • 使用特征向量和标签学习预测模型。

spark.ml将这样的工作流表示为Pipeline,它按特定的顺序运行一系列Pipeline(Transformer和Estimator)组成。

Param:Param被用来设置 Transformer 或者 Estimator 的参数。

如何运行?

工作流(Pipeline)是只由多个工作阶段组成的序列,每个阶段可以是一个Transformer或者Estimator,这些阶段按照顺序运行,并且输入的数据集在每个阶段时都会被修改。

在Transformer阶段,在数据集上调用transform()方法,在Estimator阶段,调用fit()方法以生成一个Transformer(它将成为PipelineModel或者拟合Pipeline的一部分), 并且在数据集上调用该Transformer的transform()方法。

下图是工作流训练时的使用情况,用来说明简单的文本文档处理工作流程,

6.png

在上图中,第一行代表了三个阶段的工作流,前两个(Tokenizer和HashingTF)是转换器(蓝色框),第三个阶段(Logistic Regression)是评估器(红色框)。

第二行代表了流经工作流的数据情况,其中圆柱体代表DataFrame。Tokenizer.transform()方法将原始文本文档(Raw text)拆分为单词,在数据集中添加带有单词的新列。HashingTF.transform()方法将words列转换为特征向量,并将带有这些向量的新列添加到数据集。然后,由于LogisticRegression是Estimator,因此管道首先调用LogisticRegression.fit()来生成LogisticRegressionModel。如果管道有更多阶段,则在将数据集传递到下一阶段之前,将在数据集中调用LogisticRegressionModel的transform()方法。

经过上述工作流,将产生一个PipelineModel,它是一个Transformer,这个模型在测试数据上使用的工作流如下,


7.png


在上图中,PipelineModel和原始Pipeline阶段数相同,但是原始Pipeline中的所有Estimator都已变为Transformer。在测试数据集上调用PipelineModel的transform()方法时,数据将按顺序传递给工作流。每个阶段的transform()方法都会更新数据集,并将其传递到下一个阶段。

Spark.ML实战

下面就以一个简单的实例来构造一个逻辑回归(LogisticRegression)工作流,来加深一下对上述流程的理解和印象。

在spark.ml实战中假设这样一个场景,一个句子(sentence)中,如果包含单词"flower",那么它的标签为1,否则为0。

导入模块

从前面讲解中我们可以清楚的知道,在构造这个工作流中,我们会用到Tokenizer、HashingTF、DataFrame、LogisticRegression、Pipeline,我们在程序开头先导入这些模块,


         

构造数据集

了解了示例背景,下面我们分别来构造训练集DataFrame和测试集DataFrame,


         

构建Pipeline

我们首先要来构造上面第一个图中的Pipeline(Estimator),在构建这个工作流时我们需要用到Tokenizer、HashingTF进行分词和抽取特征,然后调用逻辑回归算法,生成PipelineModel,


         

然后通过Pipeline把上述三个阶段连接起来,构成工作流,


         

可以看出,上面输出一个PipelineModel。

构建PipelineModel

下面要完成的就是上面第二幅图中的工作流,在测试集上面首先调用PipelineModel,然后通过多个阶段的数据传递,最终输出预测结果,


         

上述就是基本的Spark.ml应用实例。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
5天前
|
机器学习/深度学习 人工智能 自然语言处理
大数据分析的技术和方法:从深度学习到机器学习
大数据时代的到来,让数据分析成为了企业和组织中不可或缺的一环。如何高效地处理庞大的数据集并且从中发现潜在的价值是每个数据分析师都需要掌握的技能。本文将介绍大数据分析的技术和方法,包括深度学习、机器学习、数据挖掘等方面的应用,以及如何通过这些技术和方法来解决实际问题。
65 2
|
5天前
|
机器学习/深度学习 存储 分布式计算
机器学习PAI关于maxcompute上用protobuf 处理数据,比较方便的方式
机器学习PAI关于maxcompute上用protobuf 处理数据,比较方便的方式
|
5天前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
168 0
|
5天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
5天前
|
机器学习/深度学习 分布式计算 算法
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
45 0
|
5天前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
20 2
|
5天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
5天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5天前
|
机器学习/深度学习 SQL 人工智能
人工智能平台PAI 操作报错合集之机器学习PAI缺失值补充报错,从odps读取数据正常 进行下一步时,补充缺失值报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
5天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
143 0

热门文章

最新文章