深入理解Spark:核心思想与源码分析. 1.2 Spark初体验

简介:

1.2 Spark初体验

本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。

1.2.1 运行spark-shell

要运行spark-shell,需要先对Spark进行配置。

1)进入Spark的conf文件夹:

cd ~/install/spark-1.2.0-bin-hadoop1/conf

2)复制一份spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

3)添加如下配置:

export SPARK_MASTER_IP=127.0.0.1

export SPARK_LOCAL_IP=127.0.0.1

4)启动spark-shell:

cd ~/install/spark-1.2.0-bin-hadoop1/bin

./spark-shell

最后我们会看到spark启动的过程,如图1-3所示。

 

图1-3 Spark启动过程

从以上启动日志中我们可以看到SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处望文生义即可,具体内容将在后边的章节详细讲解。

1.2.2 执行word count

这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开scala命令行,然后按照以下步骤输入脚本。

1)输入val lines = sc.textFile("../README.md", 2),执行结果如图1-4所示。

 

图1-4 步骤1执行结果

2)输入val words = lines.flatMap(line => line.split(" ")),执行结果如图1-5所示。

 

图1-5 步骤2执行结果

3)输入val ones = words.map(w => (w,1)),执行结果如图1-6所示。

 

图1-6 步骤3执行结果

4)输入val counts = ones.reduceByKey(_ + _),执行结果如图1-7所示。

 

图1-7 步骤4执行结果

5)输入counts.foreach(println),任务执行过程如图1-8和图1-9所示。输出结果如图1-10所示。

 

图1-8 步骤5执行过程部分(一)

 

图1-9 步骤5执行过程部分(二)

 

图1-10 步骤5输出结果

在这些输出日志中,我们先是看到Spark中任务的提交与执行过程,然后看到单词计数的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第5章中展开。

1.2.3 剖析spark-shell

通过word count在spark-shell中执行的过程,我们想看看spark-shell做了什么。spark-shell中有以下一段脚本,见代码清单1-1。

代码清单1-1 spark-shell中的一段脚本

function main() {

    if $cygwin; then

stty -icanonmin 1 -echo > /dev/null 2>&1

        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"

        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"

sttyicanon echo > /dev/null 2>&1

    else

        export SPARK_SUBMIT_OPTS

        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"

fi

}

我们看到脚本spark-shell里执行了spark-submit脚本,打开spark-submit脚本,发现其中包含以下脚本。

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。打开spark-class脚本,其中包含以下脚本,见代码清单1-2。

代码清单1-2 spark-class

if [ -n "${JAVA_HOME}" ]; then

    RUNNER="${JAVA_HOME}/bin/java"

else

    if [ `command -v java` ]; then

        RUNNER="java"

    else

       echo "JAVA_HOME is not set" >&2

       exit 1

    fi

fi

 

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

读到这里,应该知道Spark启动了以SparkSubmit为主类的jvm进程。

为便于在本地对Spark进程使用远程监控,给spark-class脚本追加以下jmx配置:

JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

在本地打开jvisualvm,添加远程主机,如图1-11所示。

右击已添加的远程主机,添加JMX连接,如图1-12所示。

 

单击右侧的“线程”选项卡,选择main线程,然后单击“线程Dump”按钮,如图1-13所示。

从dump的内容中找到线程main的信息,如代码清单1-3所示。

 

图1-13 查看Spark线程

代码清单1-3 main线程dump信息

"main" - Thread t@1

    java.lang.Thread.State: RUNNABLE

        at java.io.FileInputStream.read0(Native Method)

        at java.io.FileInputStream.read(FileInputStream.java:210)

        at scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)

        at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)

        at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.

        java:933)

        at scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)

        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)

        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)

        at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.

        scala:80)

        at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-

        Reader.scala:43)

        at org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)

        at org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:619)

        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp

        (SparkI-Loop.scala:968)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

        scala:916)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

        scala:916)

        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClass

        Loader.scala:135)

        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

        at org.apache.spark.repl.Main$.main(Main.scala:31)

        at org.apache.spark.repl.Main.main(Main.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.

        java:57)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-

        sorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main→SparkI-Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见代码清单1-4。

代码清单1-4 initializeSpark的实现

def initializeSpark() {

intp.beQuietDuring {

    command("""

        @transient val sc = {

            val _sc = org.apache.spark.repl.Main.interp.createSparkContext()

            println("Spark context available as sc.")

            _sc

        }

        """)

        command("import org.apache.spark.SparkContext._")

    }

}

我们看到initializeSpark调用了createSparkContext方法,createSparkContext的实现见代码清单1-5。

代码清单1-5 createSparkContext的实现

def createSparkContext(): SparkContext = {

valexecUri = System.getenv("SPARK_EXECUTOR_URI")

valjars = SparkILoop.getAddedJars

valconf = new SparkConf()

    .setMaster(getMaster())

    .setAppName("Spark shell")

    .setJars(jars)

    .set("spark.repl.class.uri", intp.classServer.uri)

if (execUri != null) {

                      conf.set("spark.executor.uri", execUri)

    }

sparkContext = new SparkContext(conf)

    logInfo("Created spark context..")

    sparkContext

}

这里最终使用SparkConf和SparkContext来完成初始化,具体内容将在第3章讲解。代码分析中涉及的repl主要用于与Spark实时交互。

相关文章
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2029 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
933 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1288 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1620 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
897 0
|
分布式计算 Spark Hadoop
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六)
- 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
1178 0
|
分布式计算 Apache Spark
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
理解FinalStage是如何按stage从前到后依次提交顺序
2221 0
|
缓存 分布式计算 Scala
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
理解FinalStage的转化(即Stage的划分)
891 0
|
分布式计算 Spark
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
理解DAG事件循环处理器处理事件流程
1042 0
|
分布式计算
Spark2.4.0源码分析之WorldCount 触发作业提交(二)
Final RDD作为参数,通过RDD.collect()函数触发作业提交
1374 0