Python大数据之PySpark(八)SparkCore加强

简介: Python大数据之PySpark(八)SparkCore加强

SparkCore加强

  • 重点:RDD的持久化和Checkpoint
  • 提高拓展知识:Spark内核调度全流程,Spark的Shuffle
  • 练习:热力图统计及电商基础指标统计
  • combineByKey作为面试部分重点,可以作为扩展知识点

Spark算子补充

# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、从本地文件系统创建RDD数据集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成联合操作
    print(x.join(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    sc.stop()

[掌握]RDD 持久化

为什么使用缓存

  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
  • 使用经验:一次缓存可以多次使用

如何进行缓存?

  • spark中提供cache方法
  • spark中提供persist方法
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、从本地文件系统创建RDD数据集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成联合操作
    join_result_rdd = x.join(y)
    print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
    join_result_rdd.cache()
    # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
    join_result_rdd.collect()
    # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
    print(join_result_rdd.count())
    time.sleep(600)
    sc.stop()


缓存级别

  • 如何选:
  • 1-首选内存
  • 2-内存放不下,尝试序列化
  • 3-如果算子比较昂贵可以缓存在磁盘中,否则不要直接放入磁盘
  • 4-使用副本机制完成容错性质

释放缓存

  • 后续讲到Spark内存模型中,缓存放在Execution内存模块
  • 如果不在需要缓存的数据,可以释放

  • 最近最少使用(LRU)
print(“释放缓存之后,直接从rdd的依赖链重新读取”)
print(join_result_rdd.count())


* <

何时缓存数据

  • rdd来之不易
  • 经过很长依赖链计算
  • 经过shuffle
  • rdd被使用多次
  • 缓存cache或persist
  • 问题:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质
  • 内存在重启之后没有数据了,磁盘也会数据丢失
  • 注意:缓存会将依赖链进行保存的
  • 如何解决基于cache或persist的存储在易失介质的问题?
  • 引入checkpoint检查点机制
  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制
  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算
  • 什么是元数据?
  • 管理数据的数据
  • 比如,数据大小,位置等都是元数据

[掌握]RDD Checkpoint

  • 为什么有检查点机制?
  • 因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
  • Spark的容错问题?
  • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
  • 如何使用检查点机制?
  • 指定数据保存在哪里?
  • sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
  • 对谁缓存?答案算子
  • rdd1.checkpoint() 斩断依赖关系进行检查点
  • 检查点机制触发方式
  • action算子可以触发
  • 后续的计算过程
  • Spark机制直接从checkpoint中读取数据
  • 实验过程还原:
  • 检查点机制那些作用?
  • 将数据和元数据保存在HDFS中
  • 后续执行rdd的计算直接基于checkpoint的rdd
  • 起到了容错的作用
  • 面试题:如何实现Spark的容错?
  • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
  • 2-否则查看checkpoint是否保存数据
  • 3-否则根据依赖关系重建RDD
  • 检查点机制案例

持久化和Checkpoint的区别

  • 存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
  • 生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
  • 依赖关系:缓存保存依赖关系,检查点斩断依赖关系链

案例测试:

先cache在checkpoint测试

  • 1-读取数据文件
  • 2-设置检查点目录
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
  • 5-如果对rdd实现unpersist
  • 6-从checkpoint中读取rdd的数据
  • 7-通过action可以查看时间


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
2月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
2月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的台风灾害分析及预测系统
针对台风灾害预警滞后、精度不足等问题,本研究基于Python与大数据技术,构建多源数据融合的台风预测系统。利用机器学习提升路径与强度预测准确率,结合Django框架实现动态可视化与实时预警,为防灾决策提供科学支持,显著提高应急响应效率,具有重要社会经济价值。
|
2月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
机器学习/深度学习 人工智能 大数据
AI时代Python金融大数据分析实战:ChatGPT让金融大数据分析插上翅膀
AI时代Python金融大数据分析实战:ChatGPT让金融大数据分析插上翅膀
766 6
|
Web App开发 SQL Python
书籍:Python金融大数据分析 Python for Finance_ Mastering Data-Driven Finance 2nd - 2019.pdf
简介 金融业最近以极高的速度采用了Python,一些最大的投资银行和对冲基金使用它来构建核心交易和风险管理系统。 针对Python 3进行了更新,本手册的第二版帮助您开始使用该语言,指导开发人员和定量分析师通过Python库和工具构建财务应用程序和交互式财务分析。
|
Python
《Python金融大数据分析》一导读
不久以前,在金融行业,Python作为一种编程语言和平台技术还被视为异端。相比之下,2014年有许多大型金融机构——如美国银行、美林证券的“石英”项目或者摩根大通的“雅典娜”项目——战略性地使用了Python和其他既定的技术,构建、改进和维护其核心IT系统。
2627 0

推荐镜像

更多