Python大数据之PySpark(八)SparkCore加强

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: Python大数据之PySpark(八)SparkCore加强

SparkCore加强

  • 重点:RDD的持久化和Checkpoint
  • 提高拓展知识:Spark内核调度全流程,Spark的Shuffle
  • 练习:热力图统计及电商基础指标统计
  • combineByKey作为面试部分重点,可以作为扩展知识点

Spark算子补充

# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、从本地文件系统创建RDD数据集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成联合操作
    print(x.join(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    sc.stop()

[掌握]RDD 持久化

为什么使用缓存

  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
  • 使用经验:一次缓存可以多次使用

如何进行缓存?

  • spark中提供cache方法
  • spark中提供persist方法
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、从本地文件系统创建RDD数据集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成联合操作
    join_result_rdd = x.join(y)
    print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
    join_result_rdd.cache()
    # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
    join_result_rdd.collect()
    # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
    print(join_result_rdd.count())
    time.sleep(600)
    sc.stop()


缓存级别

  • 如何选:
  • 1-首选内存
  • 2-内存放不下,尝试序列化
  • 3-如果算子比较昂贵可以缓存在磁盘中,否则不要直接放入磁盘
  • 4-使用副本机制完成容错性质

释放缓存

  • 后续讲到Spark内存模型中,缓存放在Execution内存模块
  • 如果不在需要缓存的数据,可以释放

  • 最近最少使用(LRU)
print(“释放缓存之后,直接从rdd的依赖链重新读取”)
print(join_result_rdd.count())


* <

何时缓存数据

  • rdd来之不易
  • 经过很长依赖链计算
  • 经过shuffle
  • rdd被使用多次
  • 缓存cache或persist
  • 问题:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质
  • 内存在重启之后没有数据了,磁盘也会数据丢失
  • 注意:缓存会将依赖链进行保存的
  • 如何解决基于cache或persist的存储在易失介质的问题?
  • 引入checkpoint检查点机制
  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制
  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算
  • 什么是元数据?
  • 管理数据的数据
  • 比如,数据大小,位置等都是元数据

[掌握]RDD Checkpoint

  • 为什么有检查点机制?
  • 因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
  • Spark的容错问题?
  • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
  • 如何使用检查点机制?
  • 指定数据保存在哪里?
  • sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
  • 对谁缓存?答案算子
  • rdd1.checkpoint() 斩断依赖关系进行检查点
  • 检查点机制触发方式
  • action算子可以触发
  • 后续的计算过程
  • Spark机制直接从checkpoint中读取数据
  • 实验过程还原:
  • 检查点机制那些作用?
  • 将数据和元数据保存在HDFS中
  • 后续执行rdd的计算直接基于checkpoint的rdd
  • 起到了容错的作用
  • 面试题:如何实现Spark的容错?
  • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
  • 2-否则查看checkpoint是否保存数据
  • 3-否则根据依赖关系重建RDD
  • 检查点机制案例

持久化和Checkpoint的区别

  • 存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
  • 生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
  • 依赖关系:缓存保存依赖关系,检查点斩断依赖关系链

案例测试:

先cache在checkpoint测试

  • 1-读取数据文件
  • 2-设置检查点目录
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
  • 5-如果对rdd实现unpersist
  • 6-从checkpoint中读取rdd的数据
  • 7-通过action可以查看时间


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
0
0
0
111
分享
相关文章
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
156 35
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
205 7
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
147 2
Python 高级编程与实战:深入理解性能优化与调试技巧
本文深入探讨了Python的性能优化与调试技巧,涵盖profiling、caching、Cython等优化工具,以及pdb、logging、assert等调试方法。通过实战项目,如优化斐波那契数列计算和调试Web应用,帮助读者掌握这些技术,提升编程效率。附有进一步学习资源,助力读者深入学习。
Python 高级编程与实战:深入理解数据科学与机器学习
本文深入探讨了Python在数据科学与机器学习中的应用,介绍了pandas、numpy、matplotlib等数据科学工具,以及scikit-learn、tensorflow、keras等机器学习库。通过实战项目,如数据可视化和鸢尾花数据集分类,帮助读者掌握这些技术。最后提供了进一步学习资源,助力提升Python编程技能。
|
14天前
|
[oeasy]python074_ai辅助编程_水果程序_fruits_apple_banana_加法_python之禅
本文回顾了从模块导入变量和函数的方法,并通过一个求和程序实例,讲解了Python中输入处理、类型转换及异常处理的应用。重点分析了“明了胜于晦涩”(Explicit is better than implicit)的Python之禅理念,强调代码应清晰明确。最后总结了加法运算程序的实现过程,并预告后续内容将深入探讨变量类型的隐式与显式问题。附有相关资源链接供进一步学习。
25 4

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等