图解大数据 | 工作流与特征工程@Spark机器学习

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 本文介绍Spark中用于大数据机器学习的板块MLlib/ML,讲解机器学习工作流(Pipeline)及其构建方式,并详解讲解基于DataFrame的Spark ML特征工程,包括二值化、定边界离散化、标准化、特征抽取等。

ShowMeAI研究中心

作者:韩信子@ShowMeAI
教程地址http://www.showmeai.tech/tutorials/84
本文地址http://www.showmeai.tech/article-detail/180
声明:版权所有,转载请联系平台与作者并注明出处

收藏ShowMeAI查看更多精彩内容


1.Spark机器学习工作流

1)Spark mllib 与ml

Spark中同样有用于大数据机器学习的板块MLlib/ML,可以支持对海量数据进行建模与应用。

Spark机器学习; Spark机器学习工作流; Spark MLlib 与 ml; 14-1

2)机器学习工作流(Pipeline)

一个典型的机器学习过程,从数据收集开始,要经历多个步骤,才能得到需要的输出。是一个包含多个步骤的流水线式工作:

  • 源数据ETL(抽取、转化、加载)
  • 数据预处理
  • 指标提取
  • 模型训练与交叉验证
  • 新数据预测

MLlib 已足够简单易用,但在一些情况下使用 MLlib 将会让程序结构复杂,难以理解和实现。

  • 目标数据集结构复杂需要多次处理。
  • 对新数据进行预测的时候,需要结合多个已经训练好的单个模型进行综合预测 Spark 1.2 版本之后引入的 ML Pipeline,可以用于构建复杂机器学习工作流应用。

以下是几个重要概念的解释:

(1)DataFrame

使用Spark SQL中的 DataFrame 作为数据集,可以容纳各种数据类型。较之 RDD,DataFrame 包含了 schema 信息,更类似传统数据库中的二维表格。

它被 ML Pipeline 用来存储源数据,例如DataFrame 中的列可以是存储的文本、特征向量、真实标签和预测的标签等。

(2)Transformer(转换器)

是一种可以将一个DataFrame 转换为另一个DataFrame 的算法。比如,一个模型就是一个 Transformer,它可以把一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。

技术上,Transformer实现了一个方法transform(),通过附加一个或多个列将一个 DataFrame 转换为另一个DataFrame。

(3)Estimator(估计器/评估器)

是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据,并生产一个 Transformer。从技术上讲,Estimator 实现了一个方法fit(),它接受一个DataFrame 并产生一个Transformer转换器。

(4)Parameter

Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有 Transformer(转换器)和Estimator(估计器)可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

(5)PipeLine(工作流/管道)

工作流将多个工作流阶段( Transformer转换器和Estimator估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

3)构建一个Pipeline工作流

Spark机器学习; Spark机器学习工作流; 构建一个Pipeline工作流; 14-2

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

① 首先需要定义 Pipeline 中的各个PipelineStage(工作流阶段)

  • 包括Transformer转换器 和Estimator评估器。
  • 比如指标提取 和 转换模型训练。
  • 有了这些处理特定问题的Transformer转换器和 Estimator评估器,就可以按照具体的处理逻辑,有序地组织PipelineStages,并创建一个Pipeline。

② 然后,可以把训练数据集作为入参,并调用 Pipelin 实例的 fit 方法,开始以流的方式来处理源训练数据

  • 这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签

③ 工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换

  • 对于 Transformer转换器阶段,在DataFrame上调用 transform() 方法。
  • 对于Estimator估计器阶段,调用fit()方法来生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的 transform()方法。

4)构建Pipeline示例

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row  # type: ignore
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

2.基于DataFrame的Spark ML特征工程

1)特征工程

Spark机器学习; 基于DataFrame的Spark ML特征工程; 特征工程; 14-3

2)二值化

Spark机器学习; 基于DataFrame的Spark ML特征工程; 二值化; 核心代码&运行结果; 14-4

continuousDataFrame = spark.createDataFrame([(0, 1.1),(1, 8.5),(2, 5.2)], ["id", "feature"])
binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

3)定边界离散化

Spark机器学习; 基于DataFrame的Spark ML特征工程; 定边界离散化; 核心代码&运行结果 ; 14-5

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] 
data = [(-999.9,),(-0.5,),(-0.3,),(0.0,),(0.2,),(999.9,)] 
dataFrame = spark.createDataFrame(data, ["features"]) 
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures") 

# 按照给定的边界进行分桶 
bucketedData = bucketizer.transform(dataFrame)

4)按照分位数离散化

Spark机器学习; 基于DataFrame的Spark ML特征工程; 按照分位数离散化; 核心代码&运行结果 ; 14-6

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)

# 分成3个桶进行离散化
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)

5)连续值幅度缩放

Spark机器学习; 基于DataFrame的Spark ML特征工程; 连续值幅度缩放; 核心代码&运行结果; 14-7

dataFrame = spark.createDataFrame([ 
(0, Vectors.dense([1.0, 0.1, -8.0]),), 
(1, Vectors.dense([2.0, 1.0, -4.0]),), 
(2, Vectors.dense([4.0, 10.0, 8.0]),) 
], ["id", "features"]) 
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") 

# 计算最大绝对值用于缩放 
scalerModel = scaler.fit(dataFrame) 

# 缩放幅度到[-1, 1]之间 
scaledData = scalerModel.transform(dataFrame)

6)标准化

Spark机器学习; 基于DataFrame的Spark ML特征工程; 标准化; 核心代码&运行结果; 14-8

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) 

# 计算均值方差等参数 
scalerModel = scaler.fit(dataFrame) 

# 标准化 
scaledData = scalerModel.transform(dataFrame)

7)添加多项式特征

Spark机器学习; 基于DataFrame的Spark ML特征工程; 添加多项式特征; 核心代码&运行结果; 14-9

df = spark.createDataFrame([(Vectors.dense([2.0, 1.0]),), (Vectors.dense([0.0, 0.0]),), (Vectors.dense([3.0, -1.0]),)], ["features"]) 
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures") 
polyDF = polyExpansion.transform(df)

8)类别型独热向量编码

Spark机器学习; 基于DataFrame的Spark ML特征工程; 类别型独热向量编码; 核心代码&运行结果; 14-10

df = spark.createDataFrame([ (0,"a"), (1,"b"), (2,"c"), (3,"a"), (4,"a"), (5,"c")], ["id","category"]) 
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
model = stringIndexer.fit(df) 
indexed = model.transform(df) 

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") 
encoded = encoder.transform(indexed)

9)文本型特征抽取

Spark机器学习; 基于DataFrame的Spark ML特征工程; 文本型特征抽取; 核心代码&运行结果; 14-11

df = spark.createDataFrame([(0, "a b c".split(" ")), (1, "a b b c a".split(" "))], ["id", "words"]) 
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0) 
model = cv.fit(df) 
result = model.transform(df)

10)文本型特征抽取

Spark机器学习; 基于DataFrame的Spark ML特征工程; 文本型特征抽取; 核心代码&运行结果; 14-12

sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"), 
(0.0, "I wish Java could use case classes"), 
(1.0, "Logistic regression models are neat") 
], ["label", "sentence"]) 

tokenizer = Tokenizer(inputCol="sentence", outputCol="words") 
wordsData = tokenizer.transform(sentenceData) 
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20) 
featurizedData = hashingTF.transform(wordsData) 
idf = IDF(inputCol="rawFeatures", outputCol="features") 
idfModel = idf.fit(featurizedData) 
rescaledData = idfModel.transform(featurizedData)

3.参考资料

【大数据技术与处理】推荐阅读

ShowMeAI 系列教程推荐

ShowMeAI用知识加速每一次技术成长

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
8天前
|
机器学习/深度学习 数据采集 算法
如何用大数据与机器学习挖掘瞪羚企业认定标准
本文探讨如何利用大数据与机器学习技术挖掘瞪羚企业认定标准。通过阿里云的大数据平台和政策宝资源整合能力,结合机器学习算法分析政策文本,提取关键信息,助力企业精准理解认定标准。文章对比了传统获取方式的局限性与新技术的优势,并以案例说明政策宝在申报中的作用,强调数据整合、模型选择及数据安全的重要性,为企业提供发展方向和政策支持。
|
2月前
|
机器学习/深度学习 数据采集 人工智能
容器化机器学习流水线:构建可复用的AI工作流
本文介绍了如何构建容器化的机器学习流水线,以提高AI模型开发和部署的效率与可重复性。首先,我们探讨了机器学习流水线的概念及其优势,包括自动化任务、确保一致性、简化协作和实现CI/CD。接着,详细说明了使用Kubeflow Pipelines在Kubernetes上构建流水线的步骤,涵盖安装、定义流水线、构建组件镜像及上传运行。容器化流水线不仅提升了环境一致性和可移植性,还通过资源隔离和扩展性支持更大规模的数据处理。
|
2月前
|
机器学习/深度学习 数据采集 分布式计算
大数据分析中的机器学习基础:从原理到实践
大数据分析中的机器学习基础:从原理到实践
106 3
|
2月前
|
机器学习/深度学习 算法 搜索推荐
机器学习“捷径”:自动特征工程全面解析
​ 在机器学习项目中,特征工程是影响模型性能的关键步骤。它通过从原始数据中提取出更有用的特征,帮助模型更好地捕捉数据中的模式。然而,传统的特征工程过程往往需要大量的领域知识和实验调整,是一项耗时费力的工作。 近年来,自动特征工程(Automated Feature Engineering)技术的兴起,为这一问题提供了新的解决方案。它旨在通过自动化方法从数据中生成和选择最优特征,使得特征工程过程更加高效。本文将详细介绍自动特征工程的基本概念、常用技术、工具,并通过代码示例展示其实际应用。
|
4月前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
384 15
|
5月前
|
机器学习/深度学习 数据采集 数据处理
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
Scikit-learn管道是构建高效、鲁棒、可复用的机器学习工作流程的利器。通过掌握管道的使用,我们可以轻松地完成从数据预处理到模型训练、评估和部署的全流程,极大地提高工作效率。
88 2
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
|
5月前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
806 5
|
6月前
|
机器学习/深度学习 数据采集 数据挖掘
特征工程在营销组合建模中的应用:基于因果推断的机器学习方法优化渠道效应估计
因果推断方法为特征工程提供了一个更深层次的框架,使我们能够区分真正的因果关系和简单的统计相关性。这种方法在需要理解干预效果的领域尤为重要,如经济学、医学和市场营销。
197 1
特征工程在营销组合建模中的应用:基于因果推断的机器学习方法优化渠道效应估计
|
6月前
|
机器学习/深度学习 自然语言处理 算法
大数据与机器学习
大数据与机器学习紧密相关,前者指代海量、多样化且增长迅速的数据集,后者则是使计算机通过数据自动学习并优化的技术。大数据涵盖结构化、半结构化及非结构化的信息,其应用广泛,包括商业智能、金融和医疗保健等领域;而机器学习分为监督学习、无监督学习及强化学习,被应用于图像识别、自然语言处理和推荐系统等方面。二者相结合,能有效提升数据分析的准确性和效率,在智能交通、医疗及金融科技等多个领域创造巨大价值。
387 2
|
8月前
|
机器学习/深度学习 SQL 数据采集
"解锁机器学习数据预处理新姿势!SQL,你的数据金矿挖掘神器,从清洗到转换,再到特征工程,一网打尽,让数据纯净如金,模型性能飙升!"
【8月更文挑战第31天】在机器学习项目中,数据质量至关重要,而SQL作为数据预处理的强大工具,助力数据科学家高效清洗、转换和分析数据。通过去除重复记录、处理缺失值和异常值,SQL确保数据纯净;利用数据类型转换和字符串操作,SQL重塑数据结构;通过复杂查询生成新特征,SQL提升模型性能。掌握SQL,就如同拥有了开启数据金矿的钥匙,为机器学习项目奠定坚实基础。
77 0

热门文章

最新文章