Apache Spark源码走读(九)如何进行代码跟读&使用Intellij idea调试Spark源码

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。本文讲解如何进行代码跟读及使用Intellij idea调试Spark源码。

<一>如何进行代码跟读

概要

今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读。众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢?

new Throwable().printStackTrace

代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁。但有时苦于对spark系统的了解程度不深,或者对scala认识不够,一时半会之内无法找到答案,那么有没有什么简便的办法呢?

我的办法就是在日志出现的地方加入下面一句话

new Throwable().printStackTrace()

现在举一个实际的例子来说明问题。

比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13

那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?

办法就是打开MemoryStore.scala,找到下述语句

logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))

在这句话之上,添加如下语句

new Throwable().printStackTrace()

 然后,重新进行源码编译

sbt/sbt assembly

再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489
14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code
java.lang.Throwable
  at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)
  at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)
  at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)
  at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)
  at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)
  at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)
  at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
  at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)
  at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)
  at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)
  at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13)
  at $line5.$read$$iwC$$iwC$$iwC.(:18)
  at $line5.$read$$iwC$$iwC.(:20)
  at $line5.$read$$iwC.(:22)
  at $line5.$read.(:24)
  at $line5.$read$.(:28)
  at $line5.$read$.()
  at $line5.$eval$.(:7)
  at $line5.$eval$.()
  at $line5.$eval.$print()
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:483)
  at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
  at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
  at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
  at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
  at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
  at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
  at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
  at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
  at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
  at org.apache.spark.repl.Main$.main(Main.scala:31)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:483)
  at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)
14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms
14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms
res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13

git同步

对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?

git reset --hard
git pull origin master

 Akka消息跟踪

追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。

还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。

grep LaunchTask -r core/src/main

 从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:    case LaunchTask(data) =>
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:        logError("Received LaunchTask command but executor was null")
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))

 小结

今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。

<二>使用Intellij idea调试Spark源码

概要

上篇博文讲述了如何通过修改源码来查看调用堆栈,尽管也很实用,但每修改一次都需要编译,花费的时间不少,效率不高,而且属于侵入性的修改,不优雅。本篇讲述如何使用intellij idea来跟踪调试spark源码。

前提

本文假设开发环境是在Linux平台,并且已经安装下列软件,我个人使用的是arch linux。

  1. jdk
  2. scala
  3. sbt
  4. intellij-idea-community-edition

安装scala插件

为idea安装scala插件,具体步骤如下:

  1. 选择File->Setting

2. 选择右侧的Install Jetbrains Plugin,在弹出窗口的左侧输入scala,然后点击安装,如下图所示:

 3. scala插件安装结束,需要重启idea生效

由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。

源码下载和导入

下载源码,假设使用git同步最新的源码:

git clone https://github.com/apache/spark.git

导入Spark源码

   1. 选择File->Import Project, 在弹出的窗口中指定spark源码目录

   2. 选择项目类型为sbt project,然后点击next

   3. 在新弹出的窗口中先选中"Use auto-import",然后点击Finish

 

导入设置完成,进入漫长的等待,idea会对导入的源码进行编译,同时会生成文件索引。

如果在提示栏出现如下的提示内容"is waiting for .sbt.ivy.lock",说明该lock文件无法创建,需要手工删除,具体操作如下:

cd $HOME/.ivy2
rm *.lock

手工删除掉lock之后,重启idea,重启后会继续上次没有完成的sbt过程。

源码编译

使用idea来编译spark源码,中间会有多次出错,问题的根源是sbt/sbt gen-idea的时候并没有很好的解决依赖关系。

解决办法如下,

   1. 选择File->Project Structures

   2. 在右侧dependencies中添加新的module

选择spark-core

其它模块如streaming-twitter, streaming-kafka, streaming-flume, streaming-mqtt出错的情况解决方案与此类似。

注意Example编译报错时的处理稍有不同,在指定Dependencies的时候,不是选择Library而是选择Module dependency,在弹出的窗口中选择sql.

有关编译出错问题的解决可以看一下这个链接,http://apache-spark-user-list.1001560.n3.nabble.com/Errors-occurred-while-compiling-module-spark-streaming-zeromq-IntelliJ-IDEA-13-0-2-td1282.html

调试LogQuery

1. 选择Run->Edit configurations

2. 添加Application,注意右侧窗口中配置项内容的填写,分别为Main class, vm options, working directory, use classpath of module

-Dspark.master=local 指定Spark的运行模式,可根据需要作适当修改。

3. 至此,在Run菜单中可以发现有"Run LogQuery"一项存在,尝试运行,保证编译成功。

4. 断点设置,在源文件的左侧双击即可打上断点标记,然后点击Run->"Debug LogQuery", 大功告成,如下图所示,可以查看变量和调用堆栈了。

 

参考

  1. http://8liang.cn/intellij-idea-spark-development
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
5月前
|
存储 缓存 Java
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
这篇文章详细介绍了Java中的IO流,包括字符与字节的概念、编码格式、File类的使用、IO流的分类和原理,以及通过代码示例展示了各种流的应用,如节点流、处理流、缓存流、转换流、对象流和随机访问文件流。同时,还探讨了IDEA中设置项目编码格式的方法,以及如何处理序列化和反序列化问题。
157 1
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
|
5月前
|
搜索推荐 Java 数据库连接
Java|在 IDEA 里自动生成 MyBatis 模板代码
基于 MyBatis 开发的项目,新增数据库表以后,总是需要编写对应的 Entity、Mapper 和 Service 等等 Class 的代码,这些都是重复的工作,我们可以想一些办法来自动生成这些代码。
77 6
|
5月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
143 1
|
5月前
|
Java Linux 开发工具
IDEA中git提交前如何关闭code analysis以及开启格式化代码
【10月更文挑战第12天】本文介绍了在 IntelliJ IDEA 中关闭代码分析和开启代码格式化的步骤。关闭代码分析可通过取消默认启用检查或针对特定规则进行调整实现,同时可通过设置 VCS 静默模式在提交时跳过检查。开启代码格式化则需在 `Settings` 中配置 `Code Style` 规则,并通过创建 Git 钩子实现提交前自动格式化。
1607 3
|
5月前
|
Java 应用服务中间件 Maven
【终极解决方案】IDEA maven 项目修改代码不生效。
【终极解决方案】IDEA maven 项目修改代码不生效。
812 1
|
6月前
|
Linux Windows
IDEA如何查看每一行代码的提交记录(人员,时间)
【9月更文挑战第24天】在IntelliJ IDEA中,可通过安装GitToolBox插件并利用其功能来便捷地查看每行代码的提交记录,包括提交者、时间和提交信息。具体操作为:首先安装GitToolBox插件,然后在代码编辑区域将鼠标悬停于目标代码行以查看简要信息,或使用快捷键打开“Version Control”窗口查看详细提交历史。
3728 2
|
7月前
|
XML 数据格式
IDEA 行注释设置,使其不从顶格开始,让其处于代码前开始
这篇文章提供了IntelliJ IDEA中如何设置行注释不从顶格开始,而是紧接在代码前面的方法,通过访问Settings中的Code Style选项进行调整,以改善代码注释的视觉效果。
|
7月前
|
开发工具 git
成功解决 IDEA 2020 版本 代码报错不提示的几种方案
这篇文章提供了几种解决IntelliJ IDEA 2020版本中代码报错不提示问题的方案,包括通过修改文件夹权限、暂存本地更改后进行git pull,以及在git pull后应用暂存的更改并提交代码到远程仓库的方法。
|
7月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
120 0
|
7月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
289 0

推荐镜像

更多