Spark SQL实战(06)-RDD与DataFrame的互操作

简介: 包含特定对象类型的 RDD 的schema。这种基于反射的方法可以使代码更简洁,在编写 Spark 应用程序时已知schema时效果很好

val spark = SparkSession.builder()

 .master("local").appName("DatasetApp")

 .getOrCreate()


Spark SQL支持两种不同方法将现有RDD转换为DataFrame:


1 反射推断

包含特定对象类型的 RDD 的schema。

这种基于反射的方法可以使代码更简洁,在编写 Spark 应用程序时已知schema时效果很好


val peopleRDD: RDD[String] = spark.sparkContext.textFile(

 "/Users/javaedge/Downloads/sparksql-train/data/people.txt")


// RDD => DF

// RDD

val peopleDF: DataFrame = peopleRDD.map(_.split(","))

 // RDD

 .map(x => People(x(0), x(1).trim.toInt))

 // DF

 .toDF()


2 通过编程接口

构造一个schema,然后将其应用到现有的 RDD。


虽然这种方法更冗长,但它允许在运行时构造 Dataset,当列及其类型直到运行时才知道时很有用。


step1

val peopleRDD: RDD[String] = spark.sparkContext.textFile(

 "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

// RDD

val peopleRowRDD: RDD[Row] = peopleRDD.map(_.split(","))

 .map(x => Row(x(0), x(1).trim.toInt))


step2

val struct = StructType(

   StructField("name", StringType, nullable = true) ::

     StructField("age", IntegerType, nullable = false) :: Nil)


step3

使用SparkSession的createDataFrame方法将RDD转换为DataFrame

val peopleDF: DataFrame = spark.createDataFrame(peopleRowRDD, struct)

peopleDF.show()

目录
相关文章
|
SQL 运维 监控
SQL查询太慢?实战讲解YashanDB SQL调优思路
本文是Meetup第十期“调优实战专场”的第二篇技术文章,上一篇《高效查询秘诀,解码YashanDB优化器分组查询优化手段》中,我们揭秘了YashanDB分组查询优化秘诀,本文将通过一个案例,助你快速上手YashanDB慢日志功能,精准定位“慢SQL”后进行优化。
|
9月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
632 2
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
466 4
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
694 15
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
414 0
【赵渝强老师】Spark RDD的缓存机制
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
2035 0
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
786 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
546 9
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
348 6