CheckPoint
CheckPoint可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等), 即它可以将数据和元数据保存到检查指向目录中。因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。
CheckPoint分为两类:
- 高可用CheckPoint:容错性优先。这种类型的检查点可确保数据永久存储,如存储在HDFS或其他分布式文件系统上。这也意味着数据通常会在网络中复制,这会降低检查点的运行速度。
- 本地CheckPoint:性能优先。RDD持久保存到执行程序中的本地文件系统。因此,数据写得更快,但本地文件系统也不是完全可靠的,一旦数据丢失,工作将无法恢复。
开发人员可以使用RDD.checkpoint()
方法来设置检查点。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory: String)
方法设置检查点目录。
下面是一个简单的例子:
import org.apache.spark.{SparkConf, SparkContext} object CheckpointExample { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local") val sc = new SparkContext(conf) // 设置 checkpoint 目录 sc.setCheckpointDir("/tmp/checkpoint") val data = sc.parallelize(List(1, 2, 3, 4, 5)) val mappedData = data.map(x => x + 1) val filteredData = mappedData.filter(x => x % 2 == 0) // 对 RDD 进行 checkpoint filteredData.checkpoint() // 触发 checkpoint filteredData.count() } }
RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复。通过对 RDD 启动检查点机制可以实现容错和高可用。
Persist与CheckPoint的区别
- 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中),而 Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
- 生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法,而 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。CheckPoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driver使用,而Persist不能被其他dirver使用。
Spark-Submit
详细参数说明
参数名 | 参数说明 |
—master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local。具体指可参考下面关于Master_URL的列表 |
—deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
—class | 应用程序的主类,仅针对 java 或 scala 应用 |
—name | 应用程序的名称 |
—jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
—packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
—exclude-packages | 为了避免冲突 而指定不包含的 package |
—repositories | 远程 repository |
—conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m” |
—properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
—driver-memory | Driver内存,默认 1G |
—driver-java-options | 传给 driver 的额外的 Java 选项 |
—driver-library-path | 传给 driver 的额外的库路径 |
—driver-class-path | 传给 driver 的额外的类路径 |
—driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
—executor-memory | 每个 executor 的内存,默认是1G |
—total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
—num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
—executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
Master_URL的值
Master URL | 含义 |
local | 使用1个worker线程在本地运行Spark应用程序 |
local[K] | 使用K个worker线程在本地运行Spark应用程序 |
local | 使用所有剩余worker线程在本地运行Spark应用程序 |
spark://HOST:PORT | 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序 |
mesos://HOST:PORT | 连接到Mesos集群,以便在该集群上运行Spark应用程序 |
yarn-client | 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。 |
yarn-cluster | 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。 |
Spark 共享变量
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:广播变量(broadcast variable)和累加器(accumulator)。
广播变量
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。说白了其实就是共享变量。
如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
一个广播变量可以通过调用SparkContext.broadcast(v)
方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
import org.apache.spark.{SparkConf, SparkContext} object BroadcastExample { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local") val sc = new SparkContext(conf) val data = sc.parallelize(List(1, 2, 3, 4, 5)) // 创建一个广播变量 val factor = sc.broadcast(2) // 使用广播变量 val result = data.map(x => x * factor.value) result.collect().foreach(println) } }
广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器
累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。
一个累加器可以通过调用SparkContext.accumulator(v)
方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=
操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。
示例代码如下:
import org.apache.spark.{SparkConf, SparkContext} object AccumulatorExample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("AccumulatorExample") val sc = new SparkContext(conf) val accum = sc.longAccumulator("My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) println(accum.value) // 输出 10 } }
这个示例中,我们创建了一个名为 My Accumulator
的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
来对其进行累加。最后,我们使用 println(accum.value)
来输出累加器的值,结果为 10
。
我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0 值”(zero value);addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
Spark SQL
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。
Spark SQL的特性
- 集成
无缝地将SQL查询与Spark程序混合。Spark SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得可以轻松地运行SQL查询以及复杂的分析算法。
- Hive兼容性
在现有仓库上运行未修改的Hive查询。Spark SQL重用了Hive前端和MetaStore,提供与现有Hive数据,查询和UDF的完全兼容性。只需将其与Hive一起安装即可。
- 标准连接
通过JDBC或ODBC连接。Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。
- 可扩展性
对于交互式查询和长查询使用相同的引擎。Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。
Spark SQL 数据类型
Spark SQL 支持多种数据类型,包括数字类型、字符串类型、二进制类型、布尔类型、日期时间类型和区间类型等。
数字类型包括:
ByteType
:代表一个字节的整数,范围是 -128 到 127¹²。ShortType
:代表两个字节的整数,范围是 -32768 到 32767¹²。IntegerType
:代表四个字节的整数,范围是 -2147483648 到 2147483647¹²。LongType
:代表八个字节的整数,范围是 -9223372036854775808 到 9223372036854775807¹²。FloatType
:代表四字节的单精度浮点数¹²。DoubleType
:代表八字节的双精度浮点数¹²。DecimalType
:代表任意精度的十进制数据,通过内部的 java.math.BigDecimal 支持。BigDecimal 由一个任意精度的整型非标度值和一个 32 位整数组成¹²。
字符串类型包括:
StringType
:代表字符字符串值。
二进制类型包括:
BinaryType
:代表字节序列值。
布尔类型包括:
BooleanType
:代表布尔值。
日期时间类型包括:
TimestampType
:代表包含字段年、月、日、时、分、秒的值,与会话本地时区相关。时间戳值表示绝对时间点。DateType
:代表包含字段年、月和日的值,不带时区。
区间类型包括:
YearMonthIntervalType (startField, endField)
:表示由以下字段组成的连续子集组成的年月间隔:MONTH(月份),YEAR(年份)。DayTimeIntervalType (startField, endField)
:表示由以下字段组成的连续子集组成的日时间间隔:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。
复合类型包括:
ArrayType (elementType, containsNull)
:代表由 elementType 类型元素组成的序列值。containsNull 用来指明 ArrayType 中的值是否有 null 值。MapType (keyType, valueType, valueContainsNull)
:表示包括一组键值对的值。通过 keyType 表示 key 数据的类型,通过 valueType 表示 value 数据的类型。valueContainsNull 用来指明 MapType 中的值是否有 null 值。StructType (fields)
:表示一个拥有 StructFields (fields) 序列结构的值。StructField (name, dataType, nullable)
:代表 StructType 中的一个字段,字段的名字通过 name 指定,dataType 指定 field 的数据类型,nullable 表示字段的值是否有 null 值。
DataFrame
DataFrame 是 Spark 中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。
DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。
DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节。此外,Spark 会自动对 DataFrame 进行优化,以提高查询性能。
下面是一个使用DataFrame的代码例子:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate() import spark.implicits._ val data = Seq( ("Alice", 25), ("Bob", 30), ("Charlie", 35) ) val df = data.toDF("name", "age") df.show()
在这个示例中,我们首先创建了一个 SparkSession
对象,然后使用 toDF
方法将一个序列转换为 DataFrame。最后,我们使用 show
方法来显示 DataFrame 的内容。