肝Spark源码的若干骚操作

简介: 肝Spark源码的若干骚操作

前言

我想起一句话,“不走弯路本身就是最好的捷径”。我一直相信,想要熟悉一个软件工程最彻底的办法就是肝源码。从我个人的职业生涯中,大凡这种大型点的工程,要想真正在自己电脑上跑起来总是有这样那样的问题最后导致跑不起来,最后落个从入门到放弃的下场,然后就没有然后了。为了减少此类事件,分享一些我的土方法,期望砸键盘之前可以再看看还能不能抢救一下。

编译源码

代码弄到本机器的时候,第一件事情是需要编译,参考我之前的文章 Spark3.0源码编译打包

编译成功可以告诉我们很多信息,首先,至少代码是没问题的,其次,工程需要的软件环境也是齐全的。

工程导入

编译之后的项目会把依赖库下载到本机,这个也是提前需要编译成功的原因,直接导入选择maven工程即可,因为本机有依赖库,直接就构建项目,导入之后的效果如图:

这个地方不需要不断调,如果没有成功,有以下几个建议:

1.idea的版本选择高一些,不行直接干掉下载一个最新版本

2.idea实际工程的依赖是iml文件描述的,退出idea删除这个文件,再执行导入

3.window下我也是运行过的,1.5x左右的版本确实会比较痛苦,就现在来说直接把版本干到3.0以上即可,会顺利很多

4.mac电脑和linux环境确实会顺利很多,我当年的做法是直接把笔记本电脑安装ubuntu,到了后期发育之后公司有发mac,所以经历了各种环境之后我可以淡定的说windows可以搞,mac会更好。

我自己的话spark在这一方面比hadoop和hbase之类的顺利很多。

跑代码

我们目标是需要把这个工程给跑起来,这一步非常重要,看代码和跑代码是完全不一样的两个境界,而且很多场景跑起来才知道咋回事!

我们spark也给了一些例子程序,在examples目录下面,下面一堆程序,这个我也是认为这个是spark最好的入门教材。

需要注意的是,代码本身是在分布式环境上面跑的,如果本机要跑起来需要把master指向本机就行,比如我们那个PI程序:

我们直接运行,一般情况会出现一个问题:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product

这个是maven项目中依赖的provide包没有在classpath中,我们需要加上

再次运行便可以执行了。

调试方法

Spark源码里面有一些像隐式转换,柯里化,函数类型的参数等一些语法特性,加上本身核心代码都是大师级的人写的,设计上其实比较巧妙,一开始的时候程序一下子从哪里执行到哪里都一脸懵逼,这个情况我的做法就是,直接在关键的点去增加我要打印信息代码,然后再执行,不断去探索里面的执行逻辑。

举例说明:spark在很多算子里面有withScope那么一票操作,类似这种

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
  }

很长时间比较晕,后面不断往前找

private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)

最后在RDDOperationScope中找到了实现的具体逻辑,然后我就直接去上面输入一些打印的代码:

后面去控制台上面去跟踪打印的内容,后面其实才明白其实就是记录RDD链接的路径来着:

有两个地方需要注意:

1.spark在写代码的时候是有代码格式的约束的,所以我加了scalastyle:on/off 这种注释,同时语言风格会要求符号需要空格,否则的话会提示错误,但是你明明没有错误,看着不习惯

// scalastyle:on println
   print...
   // scalastyle:off println
  1. 其实单步也是可以的,但是常用的其实是打印,原因就是spark在执行的时候其实是消息处理机制,如果长时间挂在断点代码上就会出现超时之类的现象,导致跑不完程序,这个可以灵活处理即可。

WordCount 的引入

WordCount其实是需要深入研究的,中间涉及了文件的读取,分发,shuffe等全部操作的,但是有做到了最简洁,我把我调整之后的逻辑发出来:

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("WordCount")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    val textFile = sc.
      textFile("examples/src/main/resources/people.txt")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => {
       // println("exec:map")
        (word, 1)
      })
     .reduceByKey(_ + _)
     counts.collect().foreach(r => println(r._1 + " " + r._2))
    TimeUnit.HOURS.sleep(10)
  }
}

在最后让程序不停止,这样子可以看到我们的执行SparkUI,便于分析,注意观察日志,URL会日志中进行打印

改代码来点参与感

如果想要来一波修改源码的,找找感觉,我们直接来操作一波。

前面提到withScope我们可以增加打印,我们还是希望想看看那玩意具体用在哪里,有这想法的话,直接改了那票代码:

那个看名字是一个RDDOperationScope写进去的,我们在name后面加上for_test,然后运行起来查看UI,然后查看Stages的地方,修改之前是下面这个样子:

然后我们一波改动之后的变化,可以直接看到我们的for_test起作用了

瞬间感觉Spark源码看起来亲切了很多了!

改&&写单元测试

我们有时候期望把一段逻辑扣得比较细的时候,这种时候大规模的拉起整个项目不大合适了,事实上源码上面本身很多单元测试的案例,上有各种的上下文初始化环境准备,以及针对具体功能的测试,这个又是一大宝贵的学习资源库里,在那基础之上,我们针对性去写自己的测试代码,对特定的功能理解很有帮助。

单元测试在test目录下面,而且结构是和源代码对应起来的:

举例说明:

ParallelCollectionSplitSuite.scala中有对ParallelCollectionRDD中数据进行切片的测试,我们去看里面测试用例发现,用例的话给的是按照数据进行均匀切片

test("non-equal slices") {
    val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val slices = ParallelCollectionRDD.slice(data, 3)
    assert(slices.size === 3)
    assert(slices(0).mkString(",") === "1,2,3")
    assert(slices(1).mkString(",") === "4,5,6")
    assert(slices(2).mkString(",") === "7,8,9,10")
  }```
这样信息我们其实可以了解到,不但是会帮我门均匀划分组,而且元素的分配方式也是确定的,非切片个数的场景会被分配到最后的组。照着样子,我们增加一个测试两个切片的单元测试:
```java
test("two slice") {
    val data = Array(1, 2, 3)
    val slices = ParallelCollectionRDD.slice(data, 2)
    assert(slices.size === 2)
    assert(slices(0).mkString(",") === "1")
    assert(slices(1).mkString(",") === "2,3")
  }

然后当然是跑起来,这样一来就可以定制化去测试自己关注的内容了。

后记

整体来说当年就是不断这样子去弄就慢慢入门了,不至于跑不出来就放弃,到了自己本机可以玩的时候再慢慢迁移到分布式环境上面倒腾,大概从入门到不放弃就是这样子的吧,希望对同学们有帮助。

目录
相关文章
|
8月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
172 0
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
166 0
|
8月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
763 0
|
8月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
325 0
|
8月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
366 1
|
8月前
|
分布式计算 Java Hadoop
Spark3.3.0源码编译补充篇-抓狂的证书问题
Spark3.3.0源码编译补充篇-抓狂的证书问题
53 0
|
8月前
|
分布式计算 Java 程序员
Spark3.0源码编译打包
Spark3.0源码编译打包
49 0
|
8月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
117 2
|
8月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
221 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
8月前
|
机器学习/深度学习 分布式计算 搜索推荐
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
285 0