点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
RDD的介绍
RDD的特点、特点介绍
Spark 编程模型的介绍
RDD 的创建
SparkContext
SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互。
如果把Spark集群当做服务端,那么Driver就是客户端,SparkContext是客户端的核心
SparkContext是Spark对外的接口,负责向调用者提供Spark的各种功能
SparkContext用于连接Spark集群、创建RDD、累加器、广播变量
从集合创建RDD
我们在集群的节点上启动 Spark-Shell 进行学习和测试
spark-shell --master local[*] • 1
如果顺利启动,你就可以看到如下的画面:
尝试运行如下的指令,感受一下
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_412) Type in expressions to have them evaluated. Type :help for more information. scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd2.getNumPartitions res1: Int = 2 scala> rdd2.partitions.length res2: Int = 2 scala> val rdd3 = sc.makeRDD(List(1,2,3,4,5)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24 scala> val rdd4 = sc.makeRDD(1 to 100) rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd4.getNumPartitions res3: Int = 2 scala>
对应的截图如下:
从文件系统创建RDD
用 textFile() 方法来从文件系统中加载数据创建RDD,方法将文件的URI作为参数:
- 本地文件系统
- 分布式文件系统 HDFS
- Amazon S3的地址
# 本地系统 注意文件要确保存在 val lines = sc.textFile("file:///opt/wzk/1.txt") # 从分布式文件系统加载 val lines = sc.textFile("hdfs://h121.wzk.icu:9000/wcinput/wordcount.txt")
运行结果如下图所示:
从RDD创建RDD
本质是将一个RDD转换为另一个RDD,从 Transformation
Transformation
RDD的操作算子分为两类:
Transformation,用来对RDD进行转换,这个操作时延迟执行的(或者是Lazy),Transformation,返回一个新的RDD
Action,用来触发RDD的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回int、double、集合(不会返回新的RDD)
每一个Transformation操作都会产生新的RDD,供给下一个“转换”使用
转换得到RDD是惰性求值,也就是说,整个转换过程只有记录了转换的轨迹,并不会发生真正的计算,只有遇到Action操作时,才会发生真正的计算,开始从学院关系(lineage)源头开始,进行物理的转换操作。
常见转换算子1
map(func):对数据集中的每个元素都用func,然后返回一个新的RDD
filter(func):对数据集中的每个元素都是用func,然后返回一个包含使func为true的元素构成RDD
flatMap(func):与map类似,每个输入元素被映射为0或多个输出元素
mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区(N >> M),那么map的函数将被调用N次,而mapPartitions中的函数仅被调用M次,一次处理一个分区中的所有元素
mapPartitionsWithIndex(func):与mapPartitions类似,多了分区索引值信息
转换算子1测试
map filter
测试如下的代码:
val rdd1 = sc.parallelize(1 to 10) val rdd2 = rdd1.map(_*2) val rdd3 = rdd2.filter(_>10)
执行结果如下图:
我们可以查看当前的结果,但是当前的操作都是Transformation的,并没有真正的执行。
我们需要通过 collect 触发执行,拿到最终的结果
rdd2.collect rdd3.collect
将会触发执行,可以看到结果为: