前言
文章内容紧跟上篇文章:一文速学-PySpark数据分析基础:Spark本地环境部署搭建
上文已经把Pyspark的环境已经部署的十分完整了,可以顺利使用spark上任意功能,但是pyspark的原理还未知晓。只有知道原理才能更好的了解程序运行的机制以及后续如果程序出错了我们也能够第一时间反应过来是哪里出现问题。故此篇文章将详细讲述PySpark程序是如何运行的以及Spark的各种功能。
自学困难、想要快速入门将来从事数据科学师、数据建模、数据分析或者是大数据分析师职业,不妨点击下面教学视频了解人工智能以及数据科学入门基础:
人工智能&机器学习和深度学习快速入门教学视频跟学:人工智能机器学习快速入门教学视频
一、基础原理
我们知道spark是用scala开发的,而scala又是基于Java语言开发的,那么spark的底层架构就是Java语言开发的。如果要使用python来进行与java之间通信转换,那必然需要通过JVM来转换。我们先看原理构建图:
从图中我们发现在python环境中我们编写的程序将以SparkContext的形式存在,Pythpn通过于Py4j建立Socket通信,通过Py4j实现在Python中调用Java的方法,将我们编写成python的SpakrContext对象通过Py4j,最终在JVM Driver中实例化为Scala的SparkContext。
那么我们再从Spark集群运行机制来看:
主节点运行Spark任务是通过SparkContext传递任务分发到各个从节点,标橙色的方框就为JVM。通过JVM中间语言与其他从节点的JVM进行通信。之后Executor通信结束之后下发Task进行执行。
此时我们再把python在每个主从节点展示出来:
这样就一目了然了:主节点的Python通过Py4j通信传递SparkContext,最后在JVM Driver上面生成SparkContxt。主节点JVM Driver与其他从节点的JVM Executor通信传输SparkContext,JVM Executor通过分解SparkContext为许多Task,给pyspark.daemon调用pyspark.work 从socket中读取要执行的python函数和数据,开始真正的数据处理逻辑。数据处理完成之后将处理结果写回socket,jvm中通过PythonRDD的read方法读取,并返回结果。最终executor将PythonRDD的执行结果上报到drive上,返回给用户。
完整了解PySpark在集群上运行的原理之后,再看上图就很容易理解了。
Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。
二、程序运行原理
1.主节点JVM运行过程
当我们提交pyspark的任务时,会先上传python脚本以及依赖并申请资源,申请到资源后会通过PythonRunner拉起JVM。
首先PythonRunner开启Pyj4 GatewayServer,通过Java Process方式运行用户上传的Python脚本。
用户Python脚本起来后,首先会实例化Python版的SparkContext对象,并且实例化Py4j GatewayClient,连接JVM中的Py4j GatewayServer,后续在Python中调用Java的方法都是借助这个Py4j Gateway。然后通过Py4j Gateway在JVM中实例化SparkContext对象。
过上面两步后,SparkContext对象初始化完毕,与其他从节点通信。开始申请Executor资源,同时开始调度任务。用户Python脚本中定义的一系列处理逻辑最终遇到action方法后会触发Job的提交,提交Job时是直接通过Py4j调用Java的PythonRDD.runJob方法完成,映射到JVM中,会转给sparkContext.runJob方法,Job运行完成后,JVM中会开启一个本地Socket等待Python进程拉取,对应地,Python进程在调用PythonRDD.runJob后就会通过Socket去拉取结果。
2.从节点JVM运行过程
当Driver得到Executor资源时,通过CoarseGrainedExecutorBackend(其中有main方法)通信JVM,启动一些必要的服务后等待Driver的Task下发,在还没有Task下发过来时,Executor端是没有Python进程的。当收到Driver下发过来的Task后,Executor的内部运行过程如下图所示。
Executor端收到Task后,会通过launchTask运行Task,最后会调用到PythonRDD的compute方法,来处理一个分区的数据,PythonRDD的compute方法的计算流程大致分三步走:
如果不存在pyspark.deamon后台Python进程,那么通过Java Process的方式启动pyspark.deamon后台进程,注意每个Executor上只会有一个pyspark.deamon后台进程,否则,直接通过Socket连接pyspark.deamon,请求开启一个pyspark.worker进程运行用户定义的Python函数或Lambda表达式。pyspark.deamon是一个典型的多进程服务器,来一个Socket请求,fork一个pyspark.worker进程处理,一个Executor上同时运行多少个Task,就会有多少个对应的pyspark.worker进程。
紧接着会单独开一个线程,给pyspark.worker进程输入数据,pyspark.worker则会调用用户定义的Python函数或Lambda表达式处理计算。
在一边输入数据的过程中,另一边则通过Socket去拉取pyspark.worker的计算结果。
把前面运行时架构图中Executor部分单独拉出来,如下图所示,橙色部分为JVM进程,白色部分为Python进程,每个Executor上有一个公共的pyspark.deamon进程,负责接收Task请求,并fork pyspark.worker进程单独处理每个Task,实际数据处理过程中,pyspark.worker进程和JVM Task会较频繁地进行本地Socket数据通信。
三、总结
总体而言,PySpark是借助Py4j实现Python调用Java,来驱动Spark应用程序,本质上主要还是JVM runtime,Java到Python的结果返回是通过本地Socket完成。虽然这种架构保证了Spark核心代码的独立性,但是在大数据场景下,JVM和Python进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者Streaming应用场景还是慎用PySpark,尽量使用原生的Scala/Java编写应用程序,对于中小规模数据量下的简单离线任务,可以使用PySpark快速部署提交。
自学困难、想要快速入门将来从事数据科学师、数据建模、数据分析或者是大数据分析师职业,不妨点击下面教学视频了解人工智能以及数据科学入门基础:
人工智能&机器学习和深度学习快速入门教学视频跟学:人工智能机器学习快速入门教学视频