Python大数据之PySpark(四)SparkBase&Core

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: Python大数据之PySpark(四)SparkBase&Core

SparkBase&Core

  • 学习目标
  • 掌握SparkOnYarn搭建
  • 掌握RDD的基础创建及相关算子操作
  • 了解PySpark的架构及角色

环境搭建-Spark on YARN

  • Yarn 资源调度框架,提供如何基于RM,NM,Continer资源调度
  • Yarn可以替换Standalone结构中Master和Worker来使用RM和NM来申请资源

SparkOnYarn本质

  • Spark计算任务通过Yarn申请资源,SparkOnYarn
  • 将pyspark文件,经过Py4J(Python for java)转换,提交到Yarn的JVM中去运行

修改配置

  • 思考,如何搭建SparkOnYarn环境?
  • 1-需要让Spark知道Yarn(yarn-site.xml)在哪里?
  • 在哪个文件下面更改?spark-env.sh中增加YARN_CONF_DIR的配置目录

  • 2-修改Yan-site.xml配置,管理内存检查,历史日志服务器等其他操作
  • 修改配置文件

  • 3-需要配置历史日志服务器
  • 需要实现功能:提交到Yarn的Job可以查看19888的历史日志服务器可以跳转到18080的日志服务器上
  • 因为19888端口无法查看具体spark的executor后driver的信息,所以搭建历史日志服务器跳转
  • 3-需要准备SparkOnYarn的需要Jar包,配置在配置文件中
  • 在spark-default.conf中设置spark和yarn映射的jar包文件夹(hdfs)

  • 注意,在最终执行sparkonyarn的job的时候一定重启Hadoop集群,因为更改相关yarn配置
  • 4-执行SparkOnYarn
  • 这里并不能提供交互式界面,只有spark-submit(提交任务)
#基于SparkOnyarn提交任务
bin/spark-submit \
--master yarn \
/export/server/spark/examples/src/main/python/pi.py  \
10

小结

SparKOnYarn:使用Yarn提供了资源的调度和管理工作,真正执行计算的时候Spark本身

Master和Worker的结构是Spark Standalone结构 使用Master申请资源,真正申请到是Worker节点的Executor的Tasks线程

原来Master现在Yarn替换成ResourceManager,现在Yarn是Driver给ResourceManager申请资源

原来Worker现在Yarn替换为Nodemanager,最终提供资源的地方时hiNodeManager的Continer容器中的tasks

安装配置:

1-让spark知道yarn的位置

2-更改yarn的配置,这里需要开启历史日志服务器和管理内存检查

3-整合Spark的历史日志服务器和Hadoop的历史日志服务器,效果:通过8088的yarn的http://node1:8088/cluster跳转到18080的spark的historyserver上

4-SparkOnYarn需要将Spark的jars目录下的jar包传递到hdfs上,并且配置spark-default.conf让yarn知晓配置

5-测试,仅仅更换–master yarn

部署模式

#如果启动driver程序是在本地,称之为client客户端模式,现象:能够在client端看到结果

#如果在集群模式中的一台worker节点上启动driver,称之为cluser集群模式,现象:在client端看不到结果

  • client

  • 首先 client客户端提交spark-submit任务,其中spark-submit指定–master资源,指定–deploy-mode模式
  • 由启动在client端的Driver申请资源,
  • 交由Master申请可用Worker节点的Executor中的Task线程
  • 一旦申请到Task线程,将资源列表返回到Driver端
  • Driver获取到资源后执行计算,执行完计算后结果返回到Driver端
  • 由于Drivr启动在client端的,能够直接看到结果
  • 实验:
#基于Standalone的脚本—部署模式client
#driver申请作业的资源,会向–master集群资源管理器申请
#执行计算的过程在worker中,一个worker有很多executor(进程),一个executor下面有很多task(线程)
bin/spark-submit
–master spark://node1:7077
–deploy-mode client
–driver-memory 512m
–executor-memory 512m
/export/server/spark/examples/src/main/python/pi.py
10


  • cluster

  • 首先 client客户端提交spark-submit任务,其中spark-submit指定–master资源,指定–deploy-mode模式
  • 由于指定cluster模式,driver启动在worker节点上
  • 由driver申请资源,由Master返回worker可用资源列表
  • 由Driver获取到资源执行后续计算
  • 执行完计算的结果返回到Driver端,
  • 由于Driver没有启动在客户端client端,在client看不到结果
  • 如何查看数据结果?
  • 需要在日志服务器上查看,演示
  • 实验:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit
–master spark://node1.itcast.cn:7077,node2.itcast.cn:7077
–deploy-mode cluster
–driver-memory 512m
–executor-memory 512m
–num-executors 1
–total-executor-cores 2
–conf “spark.pyspark.driver.python=/root/anaconda3/bin/python3”
–conf “spark.pyspark.python=/root/anaconda3/bin/python3”
${SPARK_HOME}/examples/src/main/python/pi.py
10



         

         


         
  • 注意事项:
  • 通过firstpyspark.py写的wordcount的代码,最终也是转化为spark-submit任务提交
  • 如果是spark-shell中的代码最终也会转化为spark-submit的执行脚本
  • 在Spark-Submit中可以提交driver的内存和cpu,executor的内存和cpu,–deploy-mode部署模式

Spark On Yarn两种模式

  • Spark on Yarn两种模式
  • –deploy-mode client和cluster
  • Yarn的回顾:Driver------AppMaster------RM-----NodeManager—Continer----Task
  • client模式
#deploy-mode的结构
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit
–master yarn
–deploy-mode client
–driver-memory 512m
–driver-cores 2
–executor-memory 512m
–executor-cores 1
–num-executors 2
–queue default
${SPARK_HOME}/examples/src/main/python/pi.py
10


#瘦身
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit
–master yarn
–deploy-mode client
${SPARK_HOME}/examples/src/main/python/pi.py
10



         

         

  • 原理:

  • 1-启动Driver
  • 2-由Driver向RM申请启动APpMaster
  • 3-由RM指定NM启动AppMaster
  • 4-AppMaster应用管理器申请启动Executor(资源的封装,CPU,内存)
  • 5-由AppMaster指定启动NodeManager启动Executor
  • 6-启动Executor进程,获取任务计算所需的资源
  • 7-将获取的资源反向注册到Driver
  • 由于Driver启动在Client客户端(本地),在Client端就可以看到结果3.1415
  • 8-Driver负责Job和Stage的划分[了解]
  • 1-执行到Action操作的时候会触发Job,不如take
  • 2-接下来通过DAGscheduler划分Job为Stages,为每个stage创建task
  • 3-接下来通过TaskScheduler将每个Stage的task分配到每个executor去执行
  • 4-结果返回到Driver端,得到结果
  • cluster:
  • 作业:
${SPARK_HOME}/bin/spark-submit
–master yarn
–deploy-mode cluster
–driver-memory 512m
–executor-memory 512m
–executor-cores 1
–num-executors 2
–queue default
–conf “spark.pyspark.driver.python=/root/anaconda3/bin/python3”
–conf “spark.pyspark.python=/root/anaconda3/bin/python3”
${SPARK_HOME}/examples/src/main/python/pi.py
10
#瘦身
${SPARK_HOME}/bin/spark-submit
–master yarn
–deploy-mode cluster
${SPARK_HOME}/examples/src/main/python/pi.py
10



         

>>*



原理:

扩展阅读:两种模式详细流程

扩展阅读-Spark关键概念

扩展阅读:Spark集群角色

  • Executor通过启动多个线程(task)来执行对RDD的partition进行并行计算
  • 也就是执行我们对RDD定义的例如map、flatMap、reduce等算子操作。
  • Driver:启动SparkCOntext的地方称之为Driver,Driver需要向CLusterManager申请资源,同时获取到资源后会划分Stage提交Job
  • Master:l 主要负责资源的调度和分配,并进行集群的监控等职责;
  • worker:一个是用自己的内存存储RDD的某个或某些partition;另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算
  • Executor:一个Worker****(NodeManager)****上可以运行多个Executor,Executor通过启动多个线程(task)来执行对RDD的partition进行并行计算
  • 每个Task线程都会拉取RDD的每个分区执行计算,可以执行并行计算

扩展阅读:Spark-shell和Spark-submit

  • bin/spark-shell --master spark://node1:7077 --driver-memory 512m --executor-memory 1g
  • # SparkOnYarn组织参数
–driver-memory MEM 默认1g,Memory for driver (e.g. 1000M, 2G) (Default: 1024M). Driver端的内存
–driver-cores NUM 默认1个,Number of cores used by the driver, only in cluster mode(Default: 1).
–num-executors NUM 默认为2个,启动多少个executors
–executor-cores NUM 默认1个,Number of cores used by each executor,每个executou需要多少cpucores
–executor-memory 默认1G,Memory per executor (e.g. 1000M, 2G) (Default: 1G) ,每个executour的内存
–queue QUEUE_NAME The YARN queue to submit to (Default: “default”).
bin/spark-submit --master yarn \
–deploy-mode cluster \
–driver-memory 1g \
–driver-cores 2 \
–executor-cores 4 \
–executor-memory 512m \
–num-executors 10 \
path/XXXXX.py \
10

扩展阅读:命令参数

–driver-memory MEM 默认1g,Memory for driver (e.g. 1000M, 2G) (Default: 1024M). Driver端的内存
–driver-cores NUM 默认1个,Number of cores used by the driver, only in cluster mode(Default: 1).
–num-executors NUM 默认为2个,启动多少个executors
–executor-cores NUM 默认1个,Number of cores used by each executor,每个executou需要多少cpucores
–executor-memory 默认1G,Memory per executor (e.g. 1000M, 2G) (Default: 1G) ,每个executour的内存
–queue QUEUE_NAME The YARN queue to submit to (Default: “default”).

MAIN函数代码执行

  • Driver端负责申请资源包括关闭资源,负责任务的Stage的切分
  • Executor执行任务的计算
  • 一个Spark的Application有很多Job
  • 一个Job下面有很多Stage
  • 一个Stage有很多taskset
  • 一个Taskset有很多task任务构成的额
  • 一个rdd分task分区任务都需要executor的task线程执行计算

再续 Spark 应用

[了解]PySpark角色分析

  • Spark的任务执行的流程
  • 面试的时候按照Spark完整的流程执行即可
  • Py4J–Python For Java–可以在Python中调用Java的方法
  • 因为Python作为顶层的语言,作为API完成Spark计算任务,底层实质上还是Scala语言调用的
  • 底层有Python的SparkContext转化为Scala版本的SparkContext
  • ****为了能在Executor端运行用户定义的Python函数或Lambda表达****式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。

[了解]PySpark架构


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
61 4
|
15天前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
1月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
61 1
|
1月前
|
机器学习/深度学习 数据可视化 大数据
驾驭股市大数据:Python实战指南
【10月更文挑战第1天】随着信息技术的发展,投资者现在能够访问到前所未有的海量金融数据。本文将指导您如何利用Python来抓取当前股市行情的大数据,并通过分析这些数据为自己提供决策支持。我们将介绍从数据获取到处理、分析以及可视化整个流程的技术方法。
97 2
|
2月前
|
存储 大数据 索引
解锁Python隐藏技能:构建高效后缀树Suffix Tree,处理大数据游刃有余!
通过构建高效的后缀树,Python程序在处理大规模字符串数据时能够游刃有余,显著提升性能和效率。无论是学术研究还是工业应用,Suffix Tree都是不可或缺的强大工具。
54 6
|
1月前
|
大数据 关系型数据库 数据库
python 批量处理大数据写入数据库
python 批量处理大数据写入数据库
110 0
|
5天前
|
存储 数据挖掘 开发者
Python编程入门:从零到英雄
在这篇文章中,我们将一起踏上Python编程的奇幻之旅。无论你是编程新手,还是希望拓展技能的开发者,本教程都将为你提供一条清晰的道路,引导你从基础语法走向实际应用。通过精心设计的代码示例和练习,你将学会如何用Python解决实际问题,并准备好迎接更复杂的编程挑战。让我们一起探索这个强大的语言,开启你的编程生涯吧!
|
11天前
|
机器学习/深度学习 人工智能 TensorFlow
人工智能浪潮下的自我修养:从Python编程入门到深度学习实践
【10月更文挑战第39天】本文旨在为初学者提供一条清晰的道路,从Python基础语法的掌握到深度学习领域的探索。我们将通过简明扼要的语言和实际代码示例,引导读者逐步构建起对人工智能技术的理解和应用能力。文章不仅涵盖Python编程的基础,还将深入探讨深度学习的核心概念、工具和实战技巧,帮助读者在AI的浪潮中找到自己的位置。
|
11天前
|
机器学习/深度学习 数据挖掘 Python
Python编程入门——从零开始构建你的第一个程序
【10月更文挑战第39天】本文将带你走进Python的世界,通过简单易懂的语言和实际的代码示例,让你快速掌握Python的基础语法。无论你是编程新手还是想学习新语言的老手,这篇文章都能为你提供有价值的信息。我们将从变量、数据类型、控制结构等基本概念入手,逐步过渡到函数、模块等高级特性,最后通过一个综合示例来巩固所学知识。让我们一起开启Python编程之旅吧!
|
11天前
|
存储 Python
Python编程入门:打造你的第一个程序
【10月更文挑战第39天】在数字时代的浪潮中,掌握编程技能如同掌握了一门新时代的语言。本文将引导你步入Python编程的奇妙世界,从零基础出发,一步步构建你的第一个程序。我们将探索编程的基本概念,通过简单示例理解变量、数据类型和控制结构,最终实现一个简单的猜数字游戏。这不仅是一段代码的旅程,更是逻辑思维和问题解决能力的锻炼之旅。准备好了吗?让我们开始吧!
下一篇
无影云桌面