大数据Spark DataFrame/DataSet常用操作1

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark DataFrame/DataSet常用操作1

1 一般操作:查找和过滤

1.1 读取数据源

1.1.1读取json

使用spark.read。注意:路径默认是从HDFS,如果要读取本机文件,需要加前缀file://,如下

scala> val people = spark.read.format("json").load("file:///opt/software/data/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> people.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

spark.read.format(“json”).load(“file:///opt/software/data/people.json”)

等价于spark.read.json(“file:///opt/software/data/people.json”)

如要要读取其它格式文件,只需修改format(“json”)即可,如format(“parquet”)

1.1.2 读取Hive表

使用spark.sql。其中hive数据库名default(默认数据库名可省略),表为people

scala> val peopleDF=spark.sql("select * from default.people")
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> peopleDF.show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)

1.2 取数据列

取列的三种方式如下

scala> peopleDF.select("name","age").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+
scala> peopleDF.select($"name",$"age").show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+
scala> peopleDF.select(peopleDF.col("name"),peopleDF.col("age")).show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 22|
|  wangwu| 33|
|    lisi| 28|
+--------+---+


注意:如果在IDEA中编辑代码,使用, 则 必 须 增 加 语 句 : i m p o r t s p a r k . i m p l i c i t s . , 否 则 ,则必须增加语句:import spark.implicits._,否则,则必须增加语句:importspark.implicits.

否则表达式会报错。spark-shell默认已经导入了的

$”列名”这个是语法糖,返回Column对象

1.3 过滤算子filter(filter等价于where算子)

DF.col("id")等价于$"id",取列ColumnName

DF.filter("name=''") 过滤name等于空的行

DF.filter($"age" > 21).show() 过滤age大于21的行,必须增加语句:import spark.implicits._,否则$表达式会报错

DF.filter($"age" === 21) 取等于时必须用===,否则报错,对应的不等于是=!=。等价于DF.filter("age=21")

DF.filter("substring(name,0,1) = 'M'").show 显示name以M开头的行,其中substring是functions.scala,functions.scala包含很多函数方法,等价于DF.filter("substr(name,0,1) = 'M'").show

scala> peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: string (nullable = true)
scala> peopleDF.show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|  wangwu| 33| beijing|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.filter($"name" === "wangwu").show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+
scala> peopleDF.filter($"name" =!= "wangwu").show
+--------+---+--------+
|    name|age| address|
+--------+---+--------+
|zhangsan| 22| chengdu|
|    lisi| 28|shanghai|
+--------+---+--------+
scala> peopleDF.filter("age > 30").show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+
scala> peopleDF.filter($"age" > 30).show
+------+---+-------+
|  name|age|address|
+------+---+-------+
|wangwu| 33|beijing|
+------+---+-------+


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
Unix Docker 容器
Is the docker daemon running?
Is the docker daemon running?
4100 0
|
存储 运维 监控
Kubernetes 集群监控与日志管理实践
【5月更文挑战第28天】在微服务架构日益普及的当下,容器编排工具如 Kubernetes 已成为运维工作的核心。有效的集群监控和日志管理是确保系统稳定性和服务可靠性的关键。本文将深入探讨 Kubernetes 集群的监控策略,以及如何利用现有的工具进行日志收集、存储和分析,以实现对集群健康状况的实时掌握和问题快速定位。
|
Java C# Swift
Java Stream中peek和map不为人知的秘密
本文通过一个Java Stream中的示例,探讨了`peek`方法在流式处理中的应用及其潜在问题。首先介绍了`peek`的基本定义与使用,并通过代码展示了其如何在流中对每个元素进行操作而不返回结果。接着讨论了`peek`作为中间操作的懒执行特性,强调了如果没有终端操作则不会执行的问题。文章指出,在某些情况下使用`peek`可能比`map`更简洁,但也需注意其懒执行带来的影响。
560 2
Java Stream中peek和map不为人知的秘密
|
Java Maven
Maven配置仓储和配置国内镜像
Maven配置仓储和配置国内镜像
3035 0
|
SQL 人工智能 自然语言处理
社区供稿 | 3D物生成,帮你轻松造万物
最近魔搭上线了一项新能力——用手机环拍物体1min视频即可生成3D模型
|
供应链 监控 算法
ERP系统中的库存优化与库存周转率分析解析
【7月更文挑战第25天】 ERP系统中的库存优化与库存周转率分析解析
1145 1
|
SQL 运维 数据库
MSSQL性能调优实战:索引策略、查询优化与并发控制的精细操作
在Microsoft SQL Server(MSSQL)的日常运维与优化中,实现高效、稳定的数据库性能是每位数据库管理员和开发者的核心任务
|
SQL HIVE
Hive中日期处理函数的使用(date_format、date_add、date_sub、next_day)
Hive中日期处理函数的使用(date_format、date_add、date_sub、next_day)
3371 3
|
负载均衡 Java API
SpringCloud之gateway基本使用解读
SpringCloud之gateway基本使用解读
|
前端开发
Elasticsearch外网无法通过ip访问
Elasticsearch外网无法通过ip访问
1656 0