大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


SparkSession

RDD、DataFrame、DataSet

三者之间互相转换 详细解释

核心操作

Transformation(转换操作)

定义:

Transformation是懒执行的操作,意味着这些操作在调用时并不会立即执行计算,而是会生成一个新的数据集(或RDD),它们描述了从输入数据到输出数据的转换逻辑。Transformation的计算会被延迟,直到遇到一个Action操作时才会真正触发执行。


常见操作:


select(): 从DataFrame中选择列。

filter(): 过滤掉不符合条件的行。

join(): 连接两个DataFrame。

groupBy(): 对数据进行分组。

agg(): 聚合操作。

Action(行动操作)

定义:

Action操作会触发Spark的计算并返回结果。与Transformation不同,Action操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动程序。


常见操作:


show(): 显示DataFrame的内容。

collect(): 将DataFrame的数据收集到驱动程序上,作为本地集合返回。

count(): 计算DataFrame中的行数。

write(): 将DataFrame的数据写入外部存储(如HDFS、S3、数据库等)。

take(): 返回DataFrame的前n行数据。

Action操作

与RDD类似的操作

show

collect

collectAsList

head

first

count

take

takeAsList

reduce

与结构相关

printSchema

explain

columns

dtypes

col

生成数据

保存并上传到服务器上

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10

写入内容如下图所示:

测试运行

我们进入 spark-shell 进行测试

// 处理头,使用自动类型推断
val df1 = spark.read.option("header", true).option("infershema", "true").csv("test_spark_03.txt")

df1.count
// 缺省显示20行
df1.union(df1).show()
// 显示2行
df1.show(2)

执行结果如下图所示:

继续进行测试:

// 不截断字符
df1.toJSON.show(false)
// 显示10行 不截断字符
df1.toJSON.show(10, false)

运行结果如下图所示:

继续进行测试:

// collect 返回数组 Array[Row]
val c1 = df1.collect()
// collectAsList 返回List Lits[Row]
val c2 = df1.collectAsList()

// 返回 Row
val h1 = df1.head()
val f1 = df1.first()

// 返回 Array[Row]
val h2 = df1.head(3)
val f2 = df1.take(3)

// 返回 List[Row]
val t2 = df1.takeAsList(2)

运行结果如下图所示:

继续进行测试:

// 结构属性
// 查看列名
df1.columns
// 查看列名和类型
df1.dtypes
// 查看执行计划
df1.explain()
// 获取某个列
df1.col("ENAME")
// 常用
df1.printSchema

运行结果如下图所示:

Transformation 操作

  • RDD 类似的操作
  • 持久化/缓存 与 checkpoint

select

where

group by / 聚合

order by

join

集合操作

空值操作(函数)

函数

与RDD类似的操作

map

filter

flatMap

mapPartitions

sample

randomSplit

limt

distinct

dropDuplicates

describe

我们进行测试:

val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
// 获取第1列
df1.map(row => row.getAs[String](0)).show

// randomSplit 将DF、DS按给定参数分成多份
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

测试结果如下图:

我们继续进行测试:

// 取10行数据生成新的Dataset
val df2 = df1.limit(10)
// distinct 去重
val df2 = df1.union(df1)
df2.distinct.count

// dropDuplicates 按列值去重
df2.dropDuplicates.show
df2.dropDuplicates("_c0").show

执行结果如下图:

存储相关

  • cacheTable
  • persist
  • checkpoint
  • unpersist
  • cache

备注:Dataset默认的存储级别是 MEMEORY_AND_DISK

spark.sparkContext.setCheckpointDir("hdfs://h121.wzk.icu:9000/checkpoint")

df1.show()
df1.checkpoint()
df1.cache()

import org.apache.spark.storage.StorageLevel
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

执行结果如下图所示:

接下篇:https://developer.aliyun.com/article/1622572

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
251 0
|
3月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
8月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
337 79
|
12月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
770 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
12月前
|
人工智能 供应链 安全
AI辅助安全测试案例某电商-供应链平台平台安全漏洞
【11月更文挑战第13天】该案例介绍了一家电商供应链平台如何利用AI技术进行全面的安全测试,包括网络、应用和数据安全层面,发现了多个潜在漏洞,并采取了有效的修复措施,提升了平台的整体安全性。
502 4
|
12月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
500 2
|
12月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
425 1
|
12月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
分布式计算 Spark Python
spark RDD transformation与action函数整理
1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.
1098 0
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
217 0