Spark Shell笔记

简介: Spark Shell笔记

学习感悟


(1)学习一定要敲,感觉很简单,但是也要敲一敲,不要眼高手低

(2)一定要懂函数式编程,一定,一定

(3)shell中的方法在scala写的项目中也会有对应的方法

(4)sc和spark是程序的入口,直接用


1.png

1.png1.png1.png1.png


2.png


SparkShell


启动SparkShell


./bin/spark-shell


WordCount案例


sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/out")


RDD创建(Shell)


从集合中创建RDD


parallelize和makeRDD


val rdd1246 = sc.parallelize(List("a","b","c"))


rdd1246.collect


3.png


val rdd1617=sc.makeRDD(List(1,List(("a","b","c")),(2,List("d","e","f"))))


 rdd1617.collect


4.png


从外部存储创建RDD


由外部存储系统的数据集创建,包括本地文件系统,还有Hadoop支持的数据集,如HDFS,HBase


sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt")


从其他RDD转换


常用的Transformation和Action(Shell)


map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成


scala> var rdd1638  = sc.parallelize(1 to 10)
scala> rdd1638.collect
scala> rdd1638.map(_*2).collect


6.png


filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成


scala> var rdd1643 =sc.parallelize(1 to 10)
scala> rdd1643.filter(_>5).collect


7.png


flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)


注意:func 必须是将一个数据映射为0或多个输出元素


通俗点说:一个数据通过func函数产生的集合压平


val rdd3=sc.makeRDD(List("hello1_hello2_hello3","hello4_hello5"))
scala> rdd3.flatMap(_.split("_")).collect


8.png


sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样, false 为无放回的抽样,seed 用于指定随机 数生成器种子。例子从 RDD 中随机且有放 回的抽出 50%的数据,随机种子值为 3(即 可能以 1 2 3 的其中一个起始值)


scala> val rdd5 = sc.makeRDD(List(1,2,3,4,5,6,7))
scala> rdd5.sample(false,0.2,3).collect


9.png


takeSample:和 Sample 的区别是:takeSample 返回的是最终的结果集合。


10.png


union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个 新的 RDD


intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD


intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD


11.png


distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD. 默认情况下,只有 8 个并行任务来操作, 但是可以传入一个可选的 numTasks 参数 改变它。


rdd3 = sc.makeRDD(List(1,1,2,3,4,4,5))
rdd3.distinct(2).collect


12.png


reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个 (K,V)的 RDD,使用指定的 reduce 函数, 将相同 key 的值聚合到一起,reduce 任务 的个数可以通过第二个可选的参数来设置


13.png


groupByKey:groupByKey 也是对每个 key 进行操作,但只生成 一个 sequence。


14.png


sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序 的(K,V)的 RDD


15.png


sortBy(func,[ascending], [numTasks]):与 sortByKey 类似,但是更灵活,可以用 func 先对数据进行处理,按照处理后的数 据比较结果排序。


16.png


join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个相同 key 对应的所有元素对在一起 的(K,(V,W))的 RDD


17.png


cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个(K,(Iterable,Iterable))类型 的 RDD


18.png


cartesian(otherDataset):笛卡尔积


19.png


coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高 小数据集的执行效率。


repartition(numPartitions):根据分区数,从新通过网络随机洗牌所有 数据。


glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]


20.png


subtract:计算差的一种函数去除两个 RDD 中相同的 元素,不同的 RDD 将保留下来


21.png


mapValues:针对于(K,V)形式的类型只对 V 进行操作


22.png


reduce(func):通过 func 函数聚集 RDD 中的所有元素, 这个功能必须是可交换且可并联的


23.png


collect():在驱动程序中,以数组的形式返回数据 集的所有元素


24.png


count():返回 RDD 的元素个数


first():返回 RDD 的第一个元素(类似于 take(1))


take(n);返回一个由数据集的前 n 个元素组成的 数组


takeOrdered(n):返回前几个的排序


25.png


saveAsTextFile(path):将数据集的元素以 textfile 的形式保存 到 HDFS 文件系统或者其他支持的文件 系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文 本


saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录 下,可以使 HDFS 或者其他 Hadoop 支 持的文件系统。


saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象, 存储到文件中。


countByKey();针对(K,V)类型的 RDD,返回一个 (K,Int)的 map,表示每一个 key 对应的 元素个数。


26.png


数据读取与保存主要方式(Shell)


文本文件输入输出


val rdd1 =sc.textFile("hdfs://Master:9000/cbeann/README.txt")


 rdd.saveAsTextFile("hdfs://Master:9000/cbeann/README2.txt")


JSON 、CSV文件输入输出(Shell)


先通过文本文件读入,然后通过fastjson等第三方库解析字符串为自定义的类型

先将自定义的类型通过第三方库转换为字符串,在同文本文件的形式保存到RDD中


SequenceFile 文件输入输出(Shell)


SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的 一种平面文件(Flat File)。


 val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))


data.saveAsSequenceFile("hdfs://Master:9000/cbeann/seq")


val sdata = sc.sequenceFile[Int,String]("hdfs://Master:9000/cbeann/seq/p*")


对象文件输入输出(Shell)


val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))


data.saveAsObjectFile("hdfs://master01:9000/objfile")


val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")


Spark SQL(Shell)


启动SparkShell


./bin/spark-shell


读取数据,创建DataFrame

我的hdfs上/cbeann/person.json


 {  "name": "王小二",   "age": 15}
 {  "name": "王小三",   "age": 25}
 {  "name": "王小四",   "age": 35}


 val df = spark.read.json("hdfs://Master:9000/cbeann/person.json")


df.show


27.png


将数据注册一张表,表名为 people


df.createOrReplaceTempView("people")


28.png


发送SQL


spark.sql("select * from people where age > 16").show


29.png


或者


30.png


RDD、DataFrame、DataSet之间的转化(Shell)


RDD-》DataFrame


val rdd = sc.makeRDD(List(("zhangsan",11),("lisi",13)))
rdd.toDF("name","age").show


或者


val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
case class Person(name:String, age:Int)
 val df =  rdd.map(x=>Person(x._1,x._2.toInt)).toDF


31.png


32.png


DataFrame-》RDD


val rdd1 = df.rdd


33.png


RDD-》DataSet


val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))


val ds = rdd.toDS


或者


val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))


case class Person(name:String, age:Int)


 rdd.map(x=>Person(x._1,x._2.toInt)).toDS


34.png

35.png


DataSet-》RDD


ds.rdd


36.png

DataFrame》DataSet


scala> val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
scala> val df = rdd.toDF("name","age")
scala> case class Person(name:String, age:Int)
scala> val ds = df.as[Person]
scala> ds.collect


37.png


DataSet-》DataFrame


ds.toDF


38.png


SparkSQl输入输出(Shell)


val personDF= spark.read.format("json").load("hdfs://Master:9000/cbeann/person.json")


等价于


 val personDF1= spark.read.json("hdfs://Master:9000/cbeann/person.json")


相同的用法还有parquet,csv,text,jdbc


personDF1.write.format("json").save("hdfs://Master:9000/cbeann/person")


等价于与


personDF1.write.json("hdfs://Master:9000/cbeann/person1")


相同的用法还有parquet,csv,text,jdbc

目录
相关文章
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
169 0
|
8月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
772 0
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
141 1
|
7月前
|
机器学习/深度学习 Unix Java
技术笔记:Linux之Shell脚本编程(一)
技术笔记:Linux之Shell脚本编程(一)
69 0
|
7月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
52 0
|
8月前
|
存储 运维 Java
Linux笔记02 —— Shell补充
Linux笔记02 —— Shell补充
78 2
|
8月前
|
安全 Linux Shell
Linux笔记01 —— Linux初识与Shell汇总(请配合另一篇《Linux笔记02》一起使用)
Linux笔记01 —— Linux初识与Shell汇总(请配合另一篇《Linux笔记02》一起使用)
66 1
|
存储 Java Shell
Shell脚本入门(笔记)2
Shell脚本入门(笔记)
75 5
|
SQL 存储 分布式计算
pyspark笔记(RDD,DataFrame和Spark SQL)2
pyspark笔记(RDD,DataFrame和Spark SQL)
100 2
|
机器学习/深度学习 Java Shell
Shell脚本入门(笔记)1
Shell脚本入门(笔记)
74 1