点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节完成的内容如下:
Spark RDD的依赖关系
重回 WordCount
RDD 持久化
RDD 缓存
RDD容错机制
基本概念
涉及到的算子:checkpoint,也是Transformation
Spark中对于数据的保存除了持久化操作外,还提供了检查点的机制
检查点本质是通过RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销
cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放到内存中,但RDD的依赖链不能丢掉,当某个点某个Executor宕机了,上面cache的RDD就会丢掉,需要通过依赖链重新计算。不同的是,checkpoint是把RDD保存在HDFS中,是多副本的可靠存储,此时依赖链可以丢弃,所以斩断了依赖链。
适合场景
DAG中的Lineage过长,如果重新计算,开销会很大
在宽依赖上做checkpoint获得的收益更大
启动Shell
# 启动 spark-shell spark-shell --master local[*]
checkpoint
// 设置检查点目录 sc.setCheckpointDir("/tmp/checkpoint") val rdd1 = sc.parallelize(1 to 1000) val rdd2 = rdd1.map(_*2) rdd2.checkpoint // checkpoint是lazy操作 rdd2.isCheckpointed
可以发现,返回结果是False
RDD 依赖关系1
checkpoint之前的rdd依赖关系
- rdd2.dependencies(0).rdd
- rdd2.dependencies(0).rdd.collect
- 我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:
触发checkpoint
我们可以通过执行 Action 的方式,来触发 checkpoint
执行一次action,触发checkpoint的执行
- rdd2.count
- rdd2.isCheckpointed
- 此时观察,可以发现 checkpoint 已经是 True 了:
RDD依赖关系2
我们再次观察RDD的依赖关系:
再次查看RDD的依赖关系。可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始
rdd2.dependencies(0).rdd
rdd2.dependencies(0).rdd.collect
此时观察到,已经不是最开始的 rdd1 了:
查看checkpoint
我们可以查看对应的保存的文件,查看RDD所依赖的checkpoint文件
- rdd2.getCheckpointFile
运行的结果如下图:
RDD的分区
基本概念
spark.default.paralleism: 默认的并发数 2
本地模式
# 此时 spark.default.paralleism 为 N spark-shell --master local[N] # 此时 spark.default.paralleism 为 1 spark-shell --master local
伪分布式
- x为本机上启动的Executor数
- y为每个Executor使用的core数
- z为每个Executor使用的内存
- spark.default.paralleism 为 x * y
spark-shell --master local-cluster[x,y,z]
分布式模式
spark.default.paralleism = max(应用程序持有Executor的core总数, 2) • 1
创建RDD方式
集合创建
简单的说,RDD分区数等于cores总数
val rdd1 = sc.paralleize(1 to 100) rdd.getNumPartitions
textFile创建
如果没有指定分区数:
本地文件: rdd的分区数 = max(本地文件分片数,sc.defaultMinPartitions)
HDFS文件:rdd的分区数 = max(HDFS文件block数,sc.defaultMinPartitions)
需要额外注意的是:
本地文件分片数 = 本地文件大小 / 32M
- 读取 HDFS 文件,同时指定了分区数 < HDFS文件的Block数,指定的数将不会生效
val rdd = sc.textFile("data/1.txt") rdd.getNumPartitions
RDD分区器
判断分区器
以下RDD分别是否有分区器,是什么类型的分区器
val rdd1 = sc.textFile("/wcinput/wc.txt") rdd1.partitioner val rdd2 = sc.flatMap(_.split("\\s+")) rdd2.partitioner val rdd3 = rdd2.map((_, 1)) rdd3.partitioner val rdd4 = rdd3.reduceByKey(_ + _) rdd4.partitioner val rdd5 = rdd4.sortByKey() rdd5.partitioner