根据回显信息可知,其实在启动 Spark Shell 的时候,已经给我们实例化出了两个非常关键的对象: SparkContext 对象(sc)、SparkSession对象(spark),此处使用到的是SparkContext 对象,那么教程里的 SparkContext 对象就不用再重新执行了,直接用就可以。
2. 修改词频统计代码
修改一下教程的代码:
a、用 sc 替换 sparkContext
b、textFile 的路径需要换掉成一个存在的文件,如不存在则创建
c、加上打印到控制台语句:println("wordCountRDD")
我们实际上需要执行的是下面这几句:
val textFileRDD = sc.textFile("/home/hadoop-sny/datas/word.txt") val wordRDD = textFileRDD.flatMap(line => line.split(" ")) val pairWordRDD = wordRDD.map(word => (word, 1)) val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b) wordCountRDD.foreach(println)
代码解释:
第一行:读取一个 word.txt文件,生成一个叫 textFileRDD 的RDD
第二行: textFileRDD 调用flatMap算子,对每行进行切分操作,切割符是空格,生成内容为 wordRDD
第三行:wordRDD 对切割后的每一个单词进行map映射操作,给每一个单词映射成(word, 1)的形式,生成内容为 pairWordRDD
第四行:pairWordRDD 进行 reduceByKey 操作,根据相同的 key,对 value 进行相加操作,也就是统计操作,返回值是 wordCountRDD
第五行:打印 wordCountRDD 的内容,也就是查看统计结果,foreach 为action算子,如无Action算子,无法执行 Spark 作业。
创建一个需要统计的新文件
vi /home/hadoop-sny/datas/word.txt
添加内容:
shao shao shao nai yi yi nai hello hello hi
注意:因为我的用户名为 hadoop-sny,所以我的 ~ 表示:/home/hadoop-sny/,用户名不同,则不同,自己需要特别留意此波浪线。
3. 执行词频统计代码
执行结果如下:
其实,可以一步到位,只是不美观
sc.textFile("/home/hadoop-sny/datas/word.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b).foreach(println)
还可以更简洁点:
sc.textFile("/home/hadoop-sny/datas/word.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
0x03 Pyspark 初体验
1. 启动与关闭 Pyspark
前面的Spark Shell实际上使用的是Scala交互式Shell,实际上 Spark 也提供了一个用 Python 交互式Shell,即Pyspark。
启动:
pyspark
需要注意的是此处Spark内置的Python是2.7.5版本的,关闭也是按 Ctrl + D 即可。
2. 执行 Pyspark 并查看结果
file = sc.textFile("/home/hadoop-sny/bigdata/spark-2.2.0-bin-hadoop2.7/data/graphx/users.txt") file.count() file.first()
后面的教程主要是使用 Spark-Shell ,对于 Pyspark 大家可以自行查找资料学习:官方文档Spark Python API
0xFF 总结
- Spark Shell 入门比较简单,方便快捷,不需要开启代码编译器, 一般用于简单的测试或者简单的学习。
- 读者最好有 Spark 的相关基础与概念,不然只能操作下来而不知原理,具体可参考本博客其他相关内容。





