【大数据】Apache Spark入门到实战 3

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据】Apache Spark入门到实战

CheckPoint

CheckPoint可以将RDD从其依赖关系中抽出来,保存到可靠的存储系统(例如HDFS,S3等), 即它可以将数据和元数据保存到检查指向目录中。因此,在程序发生崩溃的时候,Spark可以恢复此数据,并从停止的任何地方开始。

CheckPoint分为两类:

  • 高可用CheckPoint:容错性优先。这种类型的检查点可确保数据永久存储,如存储在HDFS或其他分布式文件系统上。这也意味着数据通常会在网络中复制,这会降低检查点的运行速度。
  • 本地CheckPoint:性能优先。RDD持久保存到执行程序中的本地文件系统。因此,数据写得更快,但本地文件系统也不是完全可靠的,一旦数据丢失,工作将无法恢复。

开发人员可以使用RDD.checkpoint()方法来设置检查点。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory: String)方法设置检查点目录。

下面是一个简单的例子:

import org.apache.spark.{SparkConf, SparkContext}
object CheckpointExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")
    val sc = new SparkContext(conf)
    // 设置 checkpoint 目录
    sc.setCheckpointDir("/tmp/checkpoint")
    val data = sc.parallelize(List(1, 2, 3, 4, 5))
    val mappedData = data.map(x => x + 1)
    val filteredData = mappedData.filter(x => x % 2 == 0)
    // 对 RDD 进行 checkpoint
    filteredData.checkpoint()
    // 触发 checkpoint
    filteredData.count()
  }
}

RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复。通过对 RDD 启动检查点机制可以实现容错和高可用。

Persist与CheckPoint的区别

  • 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中),而 Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
  • 生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法,而 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。CheckPoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driver使用,而Persist不能被其他dirver使用。

Spark-Submit

详细参数说明

参数名 参数说明
—master master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local。具体指可参考下面关于Master_URL的列表
—deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
—class 应用程序的主类,仅针对 java 或 scala 应用
—name 应用程序的名称
—jars 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
—packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
—exclude-packages 为了避免冲突 而指定不包含的 package
—repositories 远程 repository
—conf PROP=VALUE 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m”
—properties-file 加载的配置文件,默认为 conf/spark-defaults.conf
—driver-memory Driver内存,默认 1G
—driver-java-options 传给 driver 的额外的 Java 选项
—driver-library-path 传给 driver 的额外的库路径
—driver-class-path 传给 driver 的额外的类路径
—driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用
—executor-memory 每个 executor 的内存,默认是1G
—total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用
—num-executors 启动的 executor 数量。默认为2。在 yarn 下使用
—executor-core 每个 executor 的核数。在yarn或者standalone下使用

Master_URL的值

Master URL 含义
local 使用1个worker线程在本地运行Spark应用程序
local[K] 使用K个worker线程在本地运行Spark应用程序
local 使用所有剩余worker线程在本地运行Spark应用程序
spark://HOST:PORT 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序
mesos://HOST:PORT 连接到Mesos集群,以便在该集群上运行Spark应用程序
yarn-client 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。
yarn-cluster 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。

Spark 共享变量

一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:广播变量(broadcast variable)累加器(accumulator)

广播变量

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。说白了其实就是共享变量。

如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果使用广播变量在每个Executor中只有一份Driver端的变量副本

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:

import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
    val sc = new SparkContext(conf)
    val data = sc.parallelize(List(1, 2, 3, 4, 5))
    // 创建一个广播变量
    val factor = sc.broadcast(2)
    // 使用广播变量
    val result = data.map(x => x * factor.value)
    result.collect().foreach(println)
  }
}

广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改

累加器

累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。

一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用value方法来读取累加器的值。

示例代码如下:

import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorExample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("AccumulatorExample")
    val sc = new SparkContext(conf)
    val accum = sc.longAccumulator("My Accumulator")
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    println(accum.value) // 输出 10
  }
}

这个示例中,我们创建了一个名为 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 来对其进行累加。最后,我们使用 println(accum.value) 来输出累加器的值,结果为 10

我们可以利用子类AccumulatorParam创建自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个“0 值”(zero value);addInPlace方法计算两个值的和。例如,假设我们有一个Vector类代表数学上的向量,我们能够如下定义累加器:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

Spark SQL

Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。

Spark SQL的特性

  • 集成

无缝地将SQL查询与Spark程序混合。Spark SQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得可以轻松地运行SQL查询以及复杂的分析算法。

  • Hive兼容性

在现有仓库上运行未修改的Hive查询。Spark SQL重用了Hive前端和MetaStore,提供与现有Hive数据,查询和UDF的完全兼容性。只需将其与Hive一起安装即可。

  • 标准连接

通过JDBC或ODBC连接。Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。

  • 可扩展性

对于交互式查询和长查询使用相同的引擎。Spark SQL利用RDD模型来支持中查询容错,使其能够扩展到大型作业。不要担心为历史数据使用不同的引擎。

Spark SQL 数据类型

Spark SQL 支持多种数据类型,包括数字类型、字符串类型、二进制类型、布尔类型、日期时间类型和区间类型等。

数字类型包括:

  • ByteType:代表一个字节的整数,范围是 -128 到 127¹²。
  • ShortType:代表两个字节的整数,范围是 -32768 到 32767¹²。
  • IntegerType:代表四个字节的整数,范围是 -2147483648 到 2147483647¹²。
  • LongType:代表八个字节的整数,范围是 -9223372036854775808 到 9223372036854775807¹²。
  • FloatType:代表四字节的单精度浮点数¹²。
  • DoubleType:代表八字节的双精度浮点数¹²。
  • DecimalType:代表任意精度的十进制数据,通过内部的 java.math.BigDecimal 支持。BigDecimal 由一个任意精度的整型非标度值和一个 32 位整数组成¹²。

字符串类型包括:

  • StringType:代表字符字符串值。

二进制类型包括:

  • BinaryType:代表字节序列值。

布尔类型包括:

  • BooleanType:代表布尔值。

日期时间类型包括:

  • TimestampType:代表包含字段年、月、日、时、分、秒的值,与会话本地时区相关。时间戳值表示绝对时间点。
  • DateType:代表包含字段年、月和日的值,不带时区。

区间类型包括:

  • YearMonthIntervalType (startField, endField):表示由以下字段组成的连续子集组成的年月间隔:MONTH(月份),YEAR(年份)。
  • DayTimeIntervalType (startField, endField):表示由以下字段组成的连续子集组成的日时间间隔:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天)。

复合类型包括:

  • ArrayType (elementType, containsNull):代表由 elementType 类型元素组成的序列值。containsNull 用来指明 ArrayType 中的值是否有 null 值。
  • MapType (keyType, valueType, valueContainsNull):表示包括一组键值对的值。通过 keyType 表示 key 数据的类型,通过 valueType 表示 value 数据的类型。valueContainsNull 用来指明 MapType 中的值是否有 null 值。
  • StructType (fields):表示一个拥有 StructFields (fields) 序列结构的值。
  • StructField (name, dataType, nullable):代表 StructType 中的一个字段,字段的名字通过 name 指定,dataType 指定 field 的数据类型,nullable 表示字段的值是否有 null 值。

DataFrame

DataFrame 是 Spark 中用于处理结构化数据的一种数据结构。它类似于关系数据库中的表,具有行和列。每一列都有一个名称和一个类型,每一行都是一条记录。

DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD。它提供了丰富的操作,包括筛选、聚合、分组、排序等。

DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节。此外,Spark 会自动对 DataFrame 进行优化,以提高查询性能。

下面是一个使用DataFrame的代码例子:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._
val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
)
val df = data.toDF("name", "age")
df.show()

在这个示例中,我们首先创建了一个 SparkSession 对象,然后使用 toDF 方法将一个序列转换为 DataFrame。最后,我们使用 show 方法来显示 DataFrame 的内容。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
235 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
141 5
|
4月前
|
Oracle 大数据 数据挖掘
企业内训|大数据产品运营实战培训-某电信运营商大数据产品研发中心
本课程是TsingtaoAI专为某电信运营商的大数据产品研发中心的产品支撑组设计,旨在深入探讨大数据在电信运营商领域的应用与运营策略。通过密集的培训,从数据的本质与价值出发,系统解析大数据工具和技术的最新进展,深入剖析行业内外的实践案例。课程涵盖如何理解和评估数据、如何有效运用大数据技术、以及如何在不同业务场景中实现数据的价值转化。
87 0
|
4月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
123 1
|
4月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
119 5
|
4月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
67 1
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
395 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1028 13
Apache Flink 2.0-preview released

推荐镜像

更多