学习感悟
(1)学习一定要敲,感觉很简单,但是也要敲一敲,不要眼高手低
(2)一定要懂函数式编程,一定,一定
(3)shell中的方法在scala写的项目中也会有对应的方法
(4)sc和spark是程序的入口,直接用
SparkShell
启动SparkShell
./bin/spark-shell
WordCount案例
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/out")
RDD创建(Shell)
从集合中创建RDD
parallelize和makeRDD
val rdd1246 = sc.parallelize(List("a","b","c"))
rdd1246.collect
val rdd1617=sc.makeRDD(List(1,List(("a","b","c")),(2,List("d","e","f"))))
rdd1617.collect
从外部存储创建RDD
由外部存储系统的数据集创建,包括本地文件系统,还有Hadoop支持的数据集,如HDFS,HBase
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt")
从其他RDD转换
常用的Transformation和Action(Shell)
map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
scala> var rdd1638 = sc.parallelize(1 to 10) scala> rdd1638.collect scala> rdd1638.map(_*2).collect
filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
scala> var rdd1643 =sc.parallelize(1 to 10) scala> rdd1643.filter(_>5).collect
flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
注意:func 必须是将一个数据映射为0或多个输出元素
通俗点说:一个数据通过func函数产生的集合压平
val rdd3=sc.makeRDD(List("hello1_hello2_hello3","hello4_hello5")) scala> rdd3.flatMap(_.split("_")).collect
sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样, false 为无放回的抽样,seed 用于指定随机 数生成器种子。例子从 RDD 中随机且有放 回的抽出 50%的数据,随机种子值为 3(即 可能以 1 2 3 的其中一个起始值)
scala> val rdd5 = sc.makeRDD(List(1,2,3,4,5,6,7)) scala> rdd5.sample(false,0.2,3).collect
takeSample:和 Sample 的区别是:takeSample 返回的是最终的结果集合。
union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个 新的 RDD
intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD
intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD
distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD. 默认情况下,只有 8 个并行任务来操作, 但是可以传入一个可选的 numTasks 参数 改变它。
rdd3 = sc.makeRDD(List(1,1,2,3,4,4,5)) rdd3.distinct(2).collect
reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个 (K,V)的 RDD,使用指定的 reduce 函数, 将相同 key 的值聚合到一起,reduce 任务 的个数可以通过第二个可选的参数来设置
groupByKey:groupByKey 也是对每个 key 进行操作,但只生成 一个 sequence。
sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序 的(K,V)的 RDD
sortBy(func,[ascending], [numTasks]):与 sortByKey 类似,但是更灵活,可以用 func 先对数据进行处理,按照处理后的数 据比较结果排序。
join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个相同 key 对应的所有元素对在一起 的(K,(V,W))的 RDD
cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个(K,(Iterable,Iterable))类型 的 RDD
cartesian(otherDataset):笛卡尔积
coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高 小数据集的执行效率。
repartition(numPartitions):根据分区数,从新通过网络随机洗牌所有 数据。
glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]
subtract:计算差的一种函数去除两个 RDD 中相同的 元素,不同的 RDD 将保留下来
mapValues:针对于(K,V)形式的类型只对 V 进行操作
reduce(func):通过 func 函数聚集 RDD 中的所有元素, 这个功能必须是可交换且可并联的
collect():在驱动程序中,以数组的形式返回数据 集的所有元素
count():返回 RDD 的元素个数
first():返回 RDD 的第一个元素(类似于 take(1))
take(n);返回一个由数据集的前 n 个元素组成的 数组
takeOrdered(n):返回前几个的排序
saveAsTextFile(path):将数据集的元素以 textfile 的形式保存 到 HDFS 文件系统或者其他支持的文件 系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文 本
saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录 下,可以使 HDFS 或者其他 Hadoop 支 持的文件系统。
saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象, 存储到文件中。
countByKey();针对(K,V)类型的 RDD,返回一个 (K,Int)的 map,表示每一个 key 对应的 元素个数。
数据读取与保存主要方式(Shell)
文本文件输入输出
val rdd1 =sc.textFile("hdfs://Master:9000/cbeann/README.txt")
rdd.saveAsTextFile("hdfs://Master:9000/cbeann/README2.txt")
JSON 、CSV文件输入输出(Shell)
先通过文本文件读入,然后通过fastjson等第三方库解析字符串为自定义的类型
先将自定义的类型通过第三方库转换为字符串,在同文本文件的形式保存到RDD中
SequenceFile 文件输入输出(Shell)
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的 一种平面文件(Flat File)。
val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data.saveAsSequenceFile("hdfs://Master:9000/cbeann/seq")
val sdata = sc.sequenceFile[Int,String]("hdfs://Master:9000/cbeann/seq/p*")
对象文件输入输出(Shell)
val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
data.saveAsObjectFile("hdfs://master01:9000/objfile")
val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")
Spark SQL(Shell)
启动SparkShell
./bin/spark-shell
读取数据,创建DataFrame
我的hdfs上/cbeann/person.json
{ "name": "王小二", "age": 15} { "name": "王小三", "age": 25} { "name": "王小四", "age": 35}
val df = spark.read.json("hdfs://Master:9000/cbeann/person.json")
df.show
将数据注册一张表,表名为 people
df.createOrReplaceTempView("people")
发送SQL
spark.sql("select * from people where age > 16").show
或者
RDD、DataFrame、DataSet之间的转化(Shell)
RDD-》DataFrame
val rdd = sc.makeRDD(List(("zhangsan",11),("lisi",13))) rdd.toDF("name","age").show
或者
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16))) case class Person(name:String, age:Int) val df = rdd.map(x=>Person(x._1,x._2.toInt)).toDF
DataFrame-》RDD
val rdd1 = df.rdd
RDD-》DataSet
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
val ds = rdd.toDS
或者
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
case class Person(name:String, age:Int)
rdd.map(x=>Person(x._1,x._2.toInt)).toDS
DataSet-》RDD
ds.rdd
DataFrame》DataSet
scala> val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16))) scala> val df = rdd.toDF("name","age") scala> case class Person(name:String, age:Int) scala> val ds = df.as[Person] scala> ds.collect
DataSet-》DataFrame
ds.toDF
SparkSQl输入输出(Shell)
val personDF= spark.read.format("json").load("hdfs://Master:9000/cbeann/person.json")
等价于
val personDF1= spark.read.json("hdfs://Master:9000/cbeann/person.json")
相同的用法还有parquet,csv,text,jdbc
personDF1.write.format("json").save("hdfs://Master:9000/cbeann/person")
等价于与
personDF1.write.json("hdfs://Master:9000/cbeann/person1")
相同的用法还有parquet,csv,text,jdbc