大数据Spark DataFrame/DataSet常用操作1

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark DataFrame/DataSet常用操作1

1 一般操作:查找和过滤

1.1 读取数据源

1.1.1读取json

使用spark.read。注意:路径默认是从HDFS,如果要读取本机文件,需要加前缀file://,如下

scala> val people = spark.read.format("json").load("file:///opt/software/data/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> people.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

spark.read.format(“json”).load(“file:///opt/software/data/people.json”)

等价于spark.read.json(“file:///opt/software/data/people.json”)

如要要读取其它格式文件,只需修改format(“json”)即可,如format(“parquet”)

1.1.2 读取Hive表

使用spark.sql。其中hive数据库名default(默认数据库名可省略),表为people

scala> val peopleDF=spark.sql("select * from default.people")
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> peopleDF.show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)

1.2 取数据列

取列的三种方式如下

scala> peopleDF.select("name","age").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+
scala> peopleDF.select($"name",$"age").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+
scala> peopleDF.select(peopleDF.col("name"),peopleDF.col("age")).show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+

注意:如果在IDEA中编辑代码,使用, 则 必 须 增 加 语 句 : i m p o r t s p a r k . i m p l i c i t s . , 否 则 ,则必须增加语句:import spark.implicits._,否则,则必须增加语句:importspark.implicits.

否则表达式会报错。spark-shell默认已经导入了的

$”列名”这个是语法糖,返回Column对象

1.3 过滤算子filter(filter等价于where算子)

DF.col("id")等价于$"id",取列ColumnName

DF.filter("name=''") 过滤name等于空的行

DF.filter($"age" > 21).show() 过滤age大于21的行,必须增加语句:import spark.implicits._,否则$表达式会报错

DF.filter($"age" === 21) 取等于时必须用===,否则报错,对应的不等于是=!=。等价于DF.filter("age=21")

DF.filter("substring(name,0,1) = 'M'").show 显示name以M开头的行,其中substring是functions.scala,functions.scala包含很多函数方法,等价于DF.filter("substr(name,0,1) = 'M'").show

scala> peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)
scala> peopleDF.show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.filter($"name" === "wangwu").show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+
scala> peopleDF.filter($"name" =!= "wangwu").show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.filter("age > 30").show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+
scala> peopleDF.filter($"age" > 30).show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+

2 聚合操作:groupBy和agg

2.1 排序算子sort(sort等价于orderBy)

DF.sort(DF.col(“id”).desc).show 以DF中字段id降序,指定升降序的方法。另外可指定多个字段排序

=DF.sort($“id”.desc).show

DF.sort 等价于DF.orderBy

scala> peopleDF.sort($"age").show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|    lisi| 28|shanghai|
|  wangwu| 33| beijing|
+--------+---+--------+
scala> peopleDF.sort($"age".desc).show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
|zhangsan| 22| chengdu|
+--------+---+--------+
scala> peopleDF.sort($"age".asc).show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|    lisi| 28|shanghai|
|  wangwu| 33| beijing|
+--------+---+--------+
scala> peopleDF.orderBy($"age".asc).show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|    lisi| 28|shanghai|
|  wangwu| 33| beijing|
+--------+---+--------+
scala> peopleDF.orderBy($"age".desc).show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
|zhangsan| 22| chengdu|
+--------+---+--------+

2.2 分组函数groupBy

2.2.1 分组计数

select address,count(1) from people group by address; 等价的算子如下

scala> peopleDF.show()
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
|xiaoming| 28| beijing|
|      mm| 21| chengdu|
|xiaoming| 18| beijing|
|      mm| 11| chengdu|
+--------+---+--------+
scala> peopleDF.groupBy("address").count().show
+--------+-----+
| address|count|
+--------+-----+
| beijing|    3|
| chengdu|    3|
|shanghai|    1|
+--------+-----+

2.2.2 分组后求最值、平均值、求和的方法

//等价于select address,max(age) from people group by address;
scala> peopleDF.groupBy("address").max("age").show
+--------+--------+
| address|max(age)|
+--------+--------+
| beijing|      33|
| chengdu|      22|
|shanghai|      28|
+--------+--------+
//等价于select address,avg(age) from people group by address;
scala> peopleDF.groupBy("address").avg("age").show
+--------+------------------+
| address|          avg(age)|
+--------+------------------+
| beijing|26.333333333333332|
| chengdu|              18.0|
|shanghai|              28.0|
+--------+------------------+
//等价于select address,min(age) from people group by address;
scala> peopleDF.groupBy("address").min("age").show
+--------+--------+
| address|min(age)|
+--------+--------+
| beijing|      18|
| chengdu|      11|
|shanghai|      28|
+--------+--------+
//等价于select address,sum(age) from people group by address;
scala> peopleDF.groupBy("address").sum("age").show
+--------+--------+
| address|sum(age)|
+--------+--------+
| beijing|      79|
| chengdu|      54|
|shanghai|      28|
+--------+--------+

2.2.3 分组后,求多个聚合值(最值、平均值等)。使用算子groupBy+agg

//等价于select address,count(age),max(age),min(age),avg(age),sum(age) from people group by address;
scala> peopleDF.groupBy("address").agg(count("age"),max("age"),min("age"),avg("age"),sum("age")).show
+--------+----------+--------+--------+------------------+--------+
| address|count(age)|max(age)|min(age)|          avg(age)|sum(age)|
+--------+----------+--------+--------+------------------+--------+
| beijing|         3|      33|      18|26.333333333333332|      79|
| chengdu|         3|      22|      11|              18.0|      54|
|shanghai|         1|      28|      28|              28.0|      28|
+--------+----------+--------+--------+------------------+--------+

2.2.4 分组聚合后取别名

scala> peopleDF.groupBy("address").agg(count("age").as("cnt"),avg("age").as("avg")).show
+--------+---+------------------+
| address|cnt|               avg|
+--------+---+------------------+
| beijing|  3|26.333333333333332|
| chengdu|  3|              18.0|
|shanghai|  1|              28.0|
+--------+---+------------------+

2.2.5 分组后行转列,使用pivot

//求同名用户在同一个地址的平均年龄
//把name的不同值作为列名
scala> peopleDF.groupBy("address").pivot("name").avg("age").show
+--------+----+----+------+--------+--------+
| address|lisi|  mm|wangwu|xiaoming|zhangsan|
+--------+----+----+------+--------+--------+
| beijing|null|null|  33.0|    23.0|    null|
| chengdu|null|16.0|  null|    null|    22.0|
|shanghai|28.0|null|  null|    null|    null|
+--------+----+----+------+--------+--------+

2.3 案例


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
17天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
45 6
|
15天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
58 2
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
56 1
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
27天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
34 1
|
7天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
3天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
41 7
|
3天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
13 2

热门文章

最新文章