点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节完成的内容如下:
SparkSession
RDD、DataFrame、DataSet
三者之间互相转换 详细解释
核心操作
Transformation(转换操作)
定义:
Transformation是懒执行的操作,意味着这些操作在调用时并不会立即执行计算,而是会生成一个新的数据集(或RDD),它们描述了从输入数据到输出数据的转换逻辑。Transformation的计算会被延迟,直到遇到一个Action操作时才会真正触发执行。
常见操作:
select(): 从DataFrame中选择列。
filter(): 过滤掉不符合条件的行。
join(): 连接两个DataFrame。
groupBy(): 对数据进行分组。
agg(): 聚合操作。
Action(行动操作)
定义:
Action操作会触发Spark的计算并返回结果。与Transformation不同,Action操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动程序。
常见操作:
show(): 显示DataFrame的内容。
collect(): 将DataFrame的数据收集到驱动程序上,作为本地集合返回。
count(): 计算DataFrame中的行数。
write(): 将DataFrame的数据写入外部存储(如HDFS、S3、数据库等)。
take(): 返回DataFrame的前n行数据。
Action操作
与RDD类似的操作
show
collect
collectAsList
head
first
count
take
takeAsList
reduce
与结构相关
printSchema
explain
columns
dtypes
col
生成数据
保存并上传到服务器上
EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO 7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20 7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30 7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30 7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20 7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30 7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30 7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10 7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20 7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10 7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30 7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20 7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30 7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20 7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10
写入内容如下图所示:
测试运行
我们进入 spark-shell 进行测试
// 处理头,使用自动类型推断 val df1 = spark.read.option("header", true).option("infershema", "true").csv("test_spark_03.txt") df1.count // 缺省显示20行 df1.union(df1).show() // 显示2行 df1.show(2)
执行结果如下图所示:
继续进行测试:
// 不截断字符 df1.toJSON.show(false) // 显示10行 不截断字符 df1.toJSON.show(10, false)
运行结果如下图所示:
继续进行测试:
// collect 返回数组 Array[Row] val c1 = df1.collect() // collectAsList 返回List Lits[Row] val c2 = df1.collectAsList() // 返回 Row val h1 = df1.head() val f1 = df1.first() // 返回 Array[Row] val h2 = df1.head(3) val f2 = df1.take(3) // 返回 List[Row] val t2 = df1.takeAsList(2)
运行结果如下图所示:
继续进行测试:
// 结构属性 // 查看列名 df1.columns // 查看列名和类型 df1.dtypes // 查看执行计划 df1.explain() // 获取某个列 df1.col("ENAME") // 常用 df1.printSchema
运行结果如下图所示:
Transformation 操作
- RDD 类似的操作
- 持久化/缓存 与 checkpoint
select
where
group by / 聚合
order by
join
集合操作
空值操作(函数)
函数
与RDD类似的操作
map
filter
flatMap
mapPartitions
sample
randomSplit
limt
distinct
dropDuplicates
describe
我们进行测试:
val df1 = spark.read.csv("/opt/wzk/data/people1.csv") // 获取第1列 df1.map(row => row.getAs[String](0)).show // randomSplit 将DF、DS按给定参数分成多份 val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7)) df2(0).count df2(1).count df2(2).count
测试结果如下图:
我们继续进行测试:
// 取10行数据生成新的Dataset val df2 = df1.limit(10) // distinct 去重 val df2 = df1.union(df1) df2.distinct.count // dropDuplicates 按列值去重 df2.dropDuplicates.show df2.dropDuplicates("_c0").show
执行结果如下图:
存储相关
- cacheTable
- persist
- checkpoint
- unpersist
- cache
备注:Dataset默认的存储级别是 MEMEORY_AND_DISK
spark.sparkContext.setCheckpointDir("hdfs://h121.wzk.icu:9000/checkpoint") df1.show() df1.checkpoint() df1.cache() import org.apache.spark.storage.StorageLevel df1.persist(StorageLevel.MEMORY_ONLY) df1.count() df1.unpersist(true)
执行结果如下图所示: