大数据进阶之路——Spark SQL小结

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 文章目录手写 WordCountRDD、DAG、 Stage、 Task 、 JobSpark 作业提交流程Spark 的 Local 和 Standalone宽依赖、窄依赖Spark SQL比 Hive 快在哪打包的注意事项

手写 WordCount

使用flatMap、reduceByKey 来计算

//sc是SparkContext对象,该对象是提交spark程序的入口
sc.textFile("file:///home/hadoop/data/hello.txt") // 读取文件,
  .flatMap(line => line.split(" "))  // 将文件中的每一行单词按照分隔符(这里是空格)分隔
  .map(word => (word,1))  //给每个单词计数为1
  .reduceByKey((x,y) => (x+y))  // 统计相同单词的数量
  .collect

简写

sc.textFile("file:///home/hadoop/data/hello.txt")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .collect

RDD、DAG、 Stage、 Task 、 Job

RDD(Resilient Distributed Datasets),弹性分布式数据集

DAG(Directed Acyclic Graph),有向无环图


RDD RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。


DAG Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。


Stage 在 DAG 中又进行 Stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 Stage 又可以划分成若干 Task。接下来的事情就是 Driver 发送 Task 到 Executor,Executor 线程池去执行这些 task,完成之后将结果返回给 Driver。


Job Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。


Task 一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。


Spark 作业提交流程

image.png

spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。

TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。

Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executor

Executor 启动后,会自己反向注册到 TaskScheduler 中。所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。

每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。

DAGScheduler 会将 Job 划分为多个 Stage,然后每个 Stage 创建一个 TaskSet。

TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。

Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

Spark 的 Local 和 Standalone

Spark一共有6种运行模式:Local,Standalone,Yarn-Cluster,Yarn-Client, Mesos, Kubernetes


Local: Local 模式即单机模式,如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。这也是部署、设置最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。

Standalone: Standalone 是 Spark 自身实现的资源调度框架。如果我们只使用 Spark 进行大数据计算,不使用其他的计算框架时,就采用 Standalone 模式就够了,尤其是单用户的情况下。Standalone 模式是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。其中 Driver 既可以运行在 Master 节点上中,也可以运行在本地 Client 端。当用 spark-shell 交互式工具提交 Spark 的 Job 时,Driver 在 Master 节点上运行;当使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等开发平台上使用 new SparkConf.setManager(“spark://master:7077”) 方式运行 Spark 任务时,Driver 是运行在本地 Client 端上的。

Standalone 模式的部署比较繁琐,不过官方有提供部署脚本,需要把 Spark 的部署包安装到每一台节点机器上,并且部署的目录也必须相同,而且需要 Master 节点和其他节点实现 SSH 无密码登录。启动时,需要先启动 Spark 的 Master 和 Slave 节点。提交命令类似于:

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://Oscar-2.local:7077 \
  /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
  100

其中 master:7077是 Spark 的 Master 节点的主机名和端口号,当然集群是需要提前启动。


不管使用什么模式,Spark应用程序的代码是一模一样的,只需要在提交的时候通过–master参数来指定我们的运行模式即可


Client

Driver运行在Client端(提交Spark作业的机器)

Client会和请求到的Container进行通信来完成作业的调度和执行,Client是不能退出的

日志信息会在控制台输出:便于我们测试


Cluster

Driver运行在ApplicationMaster中

Client只要提交完作业之后就可以关掉,因为作业已经在YARN上运行了

日志是在终端看不到的,因为日志是在Driver上,只能通过yarn logs -applicationIdapplication_id

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \

此处的yarn就是我们的yarn client模式

如果是yarn cluster模式的话,yarn-cluster


Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.


如果想运行在YARN之上,那么就必须要设置HADOOP_CONF_DIR或者是YARN_CONF_DIR


1) export HADOOP_CONF_DIR=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

2) $SPARK_HOME/conf/spark-env.sh

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--num-executors 1 \
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \
4

yarn logs -applicationId application_1495632775836_0002

宽依赖、窄依赖

image.png

窄依赖指的是每一个 Parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用(一子一亲)


宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD的 partition(多子一亲)


RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个数据集片段。


首先,窄依赖可以支持在同一个节点上,以 pipeline 形式执行多条命令(也叫同一个 Stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。


其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会重新调度到多个节点进行)。


Spark SQL比 Hive 快在哪

当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。

由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。


Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。


消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。


消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中 。


JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。


打包的注意事项

打包时要注意,pom.xml中需要添加如下plugin

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

注意:–files在spark中的使用


spark.read.format("parquet").load("/hiszm/clean/day=20170511/part-00000-71d465d1-7338-4016-8d1a-729504a9f95e.snappy.parquet").show(false)

mvn assembly:assembly

./bin/spark-submit \
--class com.hiszm.log.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /home/hadoop/lib/ipDatabase.csv,/home/hadoop/lib/ipRegion.xlsx \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/input/* hdfs://hadoop001:8020/hiszm/clean

存储格式的选择:http://www.infoq.com/cn/articles/bigdata-store-choose/

压缩格式的选择:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/


调整并行度

./bin/spark-submit \
--class com.hiszm.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/hiszm/clean 20170511 
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
18天前
|
SQL 存储 分布式计算
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
536 31
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
|
3月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
202 0
|
1月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
2月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
3月前
|
SQL 人工智能 分布式计算
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
57 0
|
5月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
434 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
275 9

热门文章

最新文章