大部分同学在Spark从入门到放弃这条路上都一般会了解两个经典程序,一个求圆周率的程序(Pi),另一个便是我们的WordCount了,前者一般用来验证集群安装的成功性,后者,则是编程的入门程序。 我第一次接触到spark程序的时候也是博客上面介绍在spark-shell上面去敲单词计数程序,然后洋洋洒洒敲完,自己都不明白里面干了啥。其实后面才知道里头的缘由,一个是函数式编程的那种简洁的语法,还一个是scala编程里面各种简写,所以啦,我们有必要去仔细研究里头数据变换的过程,然后才可以自己去写程序啦。
我们重新看下我们的程序,总共是三部分,我在图中进行了标注,如图一
图一:spark程序三大部分
三部分可以代表spark程序的主要流程,第一部分是Spark的环境初始化,一般约定俗成的几种名字,像Session、Context之类的,主要工作是处理环境初始化,给定应用设置参数,管理着整个spark作业的生命周期。第二部分其实是准备数据的工作,在Spark作业中我们还要处理的数据被抽象成RDD,然后使用RDD的算子去做我们的逻辑处理。这不,第三部分便是我们的处理逻辑了,至于后面的事情,是一些输出的工作,也是我们把结果计算出来之后的一些处理工作。整个流程下来,便是我们一个完整Spark作业的样子了。
第二步中的步骤其实是数据转换的过程,要让spark可以去算,肯定要把数据变成spark可以处理的结构的,数据内容本身没有做任何处理的。我们把第三步的计算过程,用我们比较习惯的方式一样去实现一下。步骤3中其实是分成了4部分flatMap->mapToPair->reduceByKey->collect,我们来进行步骤分解:
第一步,flatMap操作,lines还是直接使用,如图二所示,算子部分我们转化到了Step1中进行处理,collect是触发计算的动作,
图二,step1调整
具体的计算步骤我们调整到了Step1中去实现,如图三所示
图三,step1中的计算过程
ok,我们这样子其实也可以很方便看出原来代码中的逻辑(s -> Arrays.asList(SPACE.split(s)).iterator())的结构,这里其实是首先使用了匿名内部类去实现了逻辑,然后这个匿名内部类又是用lambda的表达式方式去实现的,我们还原成这部分逻辑,加上我们的输出,执行一下,我们观察结果,如图四:
图三,step1中的计算结果
可以很直白看出,第一次计算执行了两次,对于我们输入数据数据的大小,实际就是对应处理的每一行数据,输入之后我们根据空格进行切割,切割之后返回的是按照单词序列构成的一个数组。
第二步, 第二步的操作其实是在第一步的返回结果之上进行操作,我们可以用变量获得第一次操作的返回结果,我们调整代码如下,如图四:
图四,step1中的计算结果也是一个rdd
发现了么,step中返回的结果也是Rdd,step2计算之后还是一个rdd,换了个名字, 实际情况是在每一次的transformation算子执行之后返回出来的算子都是Rdd, 这个也是我们的spark程序可以不断一直点下去的原因。程序变得复杂的时候,我们可以看到rdd乱舞,一路狂点去,然后最后自己也不知道干了啥。。
我们看看step2中的具体实现:
图五,step2中的计算实现
每一个算子,我们需要关注几个事情:首先要关心的函数的输入类型,函数的返回类型。输入类型就是你本次处理要的数据类型,这部分是由这次要处理结果的Rdd类型决定的,我们在rdd1中返回的是String类型的迭代器,那么在step2中处理的自然就是string类型了。其次要理解的就是返回的类型,这部分其实是我们计算完成之后处理的结果。这次step2中返回的结果是Tuple2<String,Integer> ,这里面告诉我们的是返回的是一个二元组,元组里面的类型是String和Integer,这里的处理逻辑是,把输入过来的单词,每次都计数为1。我们对应运算结果:
图六,step2中的计算实现
这里出现了第三个要关心的事情,我们需要了解计算的次数,call执行了多少次,我们的输出才会被重复多少次。跟踪第一次的结果,我们这里是执行过4次,每一次执行的内容,恰好是第一次中切割之后的结果。这一步我们要记得是返回了一对对的键值对。
第三步计算,第三步的计算逻辑是叫做reduce,这个的计算逻辑是把key相同的k,v对进行两两合并(文化人叫成归约)操作,我们在上一次刚好输出的k,v对,我们把程序改到最后一步:
图七,加入step3
我们依旧把我们的数据操作输出:
图八,step3中的计算实现
我们分析一下这个算子,输入类型是两个参数v1和v2,这是算子本身是把key相同的数据进行两两合并,合并后变成一个结果,这也是为什么叫做归约操作,我们在前面的计算中,每个单词都以key,1这样子输出,那么归约之后,key相同的数据就会被合并成一个数据,这里的reduce是求和动作,所以我们的计算结果也就是把单词的数量计算出来了,我们运行一下程序:
图九:step3中的计算结果输出
我们发现算子里面call只被执行了一次,我们之前为了把计算简化,输入的数据是
Arrays.asList("Iphone Android", "Iphone Xiaomi"); 所以只有Iphone的数据才会生成两组k,v结构,其他数据只是一对,直接就是最后结果。
我调整一下输出,把整个流程打印出来:这次我把数据处理一下,可以看到我们多个k,v的合并情况:
图十:调整输入数据
调整内容是在Iphone出现在多行中,然后出现两次的有Iphone和Andriod,Iphone有着大小写的情况,模拟脏数据的情况。我们加上结果的输出,如图十:
图十一:加上一些输出的逻辑
我们最后运行一下,执行计算过程数据是这样子的:
图十二:计算过程的输出
看似明白,好又有不大明白,运行情况乱七八糟的,但是计算结果又是我们要的,那是因为,我们看日志的时候相当于在一条线上面看程序世界,程序的输出顺序就会被我们当做执行的样子(低维生物看不到高维世界发现东西的老梗)。实际情况是程序是并行执行的,所以我们也要并行去看这个结果,我来个小图看看这个过程。
图十三:数据的拆分过程
数据在分布式的环境被并行处理,但是数据转换的过程是唯一的,正如表格那样,我们的单词计数程序数据和处理算子按照我们的处理并行的执行,我们的数据一开始就是两组数据作为输入的,从左往右看可以便是数据被一步一步的转换过程。
我们跟踪一笔数据,我们的Iphone,我标红的部分,这部分体现的是数据被转的过程。首先从文本中提取我们的 Step1(flatMap),提取之后输出的是一个数组,我们的Iphone便是数组中的元素。接下来,在step2中每个单词被我们计数(Iphone,1),后面step3中,reduceByKey算子会把key的就被汇集到一起了,这样子,我们单词计数就被计算出来了。
图十三:数据的追踪
我们从wordcount中可以看到spark程序处理的流程,是通过不断rdd中的神奇算子得到我们需要的结果,所以我们很期待更加深刻认识rdd和算子,接下来我们来聊这件事情,待续~