Spark2.X弹性分布式数据集(二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 笔记

五、 DataFrame创建方式及功能使用

在Spark中, DataFrame是一 种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有 名称和类型。

使得SparkSQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之.上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。

反观RDD ,由于无从得知所存数据元素的具体内部结构, Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame和RDD的对比图15.png

RDD转换DataFrame:

准备数据集

val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")
val allRdd = rdd1.union(rdd2)

将RDD转换为DataFrame,使用toDF

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

默认列名是 _1 _2

scala> lines.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)

设置Schema为key value

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).toDF("key","value")
lines: org.apache.spark.sql.DataFrame = [key: string, value: int]
scala> lines.printSchema
root
 |-- key: string (nullable = true)
 |-- value: integer (nullable = false)

显示key

scala> lines.select("key").show
+------+
|   key|
+------+
|  java|
|python|
|hadoop|
| scala|
|spring|
|  hive|
|   php|
| linux|
|   hue|
|  unix|
| spark|
| hbase|
| mysql|
|   c++|
|     c|
+------+

DataSet转换DataFrame:

准备数据集

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1))


dataSet: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

查看DataSet的数据格式

scala> dataSet.show
+------+---+
|    _1| _2|
+------+---+
|  java|  1|
|python|  1|
|hadoop|  1|
|spring|  1|
|python|  1|
|hadoop|  1|
|  java|  1|
|     c|  1|
|   c++|  1|
| hbase|  1|
| spark|  1|
| scala|  1|
| scala|  1|
|python|  1|
|  java|  1|
| linux|  1|
|  unix|  1|
|  java|  1|
|   php|  1|
| mysql|  1|
+------+---+
only showing top 20 rows

将DataSet转化为DataFrame

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1)).toDF("key","v

这时已经转化为DataFrame的格式

dataSet: org.apache.spark.sql.DataFrame = [key: string, value: int]

DataFrame操作

dataSet.select("key","value").groupBy("key").count.sort($"count".desc).show


+------+-----+
|   key|count|
+------+-----+
|  java|    4|
|python|    3|
| scala|    2|
|hadoop|    2|
|  unix|    1|
|spring|    1|
| mysql|    1|
| spark|    1|
| linux|    1|
|   hue|    1|
|   c++|    1|
| hbase|    1|
|     c|    1|
|  hive|    1|
|   php|    1|
+------+-----+


六、DataSet创建方式及功能使用


DataSet与RDD相似,但是它们不是使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输。


DataSet创建方式有两种:一种是直接通过sparkSession对象读取外部数据创建,另一种是通过RDD转换。


(1)创建DataSet方式一

通过sparkSession对象读取外部数据创建DataSet:

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")
dataSet: org.apache.spark.sql.Dataset[String] = [value: string]

查看Schema信息

scala> dataSet.printSchema
root
 |-- value: string (nullable = true)

查看一下数据

scala> val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").show
+--------------------+
|               value|
+--------------------+
|  java python hadoop|
|spring python had...|
|   hbase spark scala|
|  scala python java |
| linux unix java php|
|      mysql hive hue|
+--------------------+
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1))

对DataSet操作

scala> lines.select("_1","_2").groupBy("_1").count.show
+------+-----+
|    _1|count|
+------+-----+
|  unix|    1|
| hbase|    1|
|spring|    1|
| mysql|    1|
| scala|    2|
| spark|    1|
|   hue|    1|
|   c++|    1|
|     c|    1|
| linux|    1|
|  java|    4|
|   php|    1|
|hadoop|    2|
|python|    3|
|  hive|    1|
+------+-----+

自定义对象的映射

case class Person(username:String,usercount:Int)
defined class Person
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => Person(x._1,x._2))

这时列名就改成了刚才我们自定义的

scala> lines.show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        1|
|  python|        1|
|  hadoop|        1|
|  spring|        1|
|  python|        1|
|  hadoop|        1|
|    java|        1|
|       c|        1|
|     c++|        1|
|   hbase|        1|
|   spark|        1|
|   scala|        1|
|   scala|        1|
|  python|        1|
|    java|        1|
|   linux|        1|
|    unix|        1|
|    java|        1|
|     php|        1|
|   mysql|        1|
+--------+---------+
only showing top 20 rows
scala> lines.select("username","usercount").groupBy("username").count.show
+--------+-----+
|username|count|
+--------+-----+
|    unix|    1|
|   hbase|    1|
|  spring|    1|
|   mysql|    1|
|   scala|    2|
|   spark|    1|
|     hue|    1|
|     c++|    1|
|       c|    1|
|   linux|    1|
|    java|    4|
|     php|    1|
|  hadoop|    2|
|  python|    3|
|    hive|    1|
+--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).show
+--------+-----+
|username|count|
+--------+-----+
|    java|    4|
|  python|    3|
|   scala|    2|
|  hadoop|    2|
|   hbase|    1|
|     hue|    1|
|    unix|    1|
|     c++|    1|
|  spring|    1|
|   mysql|    1|
|   spark|    1|
|       c|    1|
|   linux|    1|
|     php|    1|
|    hive|    1|
+--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).toDF("username","usercount").show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        4|
|  python|        3|
|   scala|        2|
|  hadoop|        2|
|   hbase|        1|
|  spring|        1|
|   spark|        1|
|       c|        1|
|     hue|        1|
|     c++|        1|
|   mysql|        1|
|   linux|        1|
|     php|        1|
|    unix|        1|
|    hive|        1|
+--------+---------+


(2)创建DataSet方式二

通过RDD转换成DataSet

scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
rdd1: org.apache.spark.rdd.RDD[String] = file:///opt/datas/stu.txt MapPartitionsRDD[187] at textFile at <console>:24
scala> val dataSet2 = rdd1.toDS
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]

这时就将RDD转换成了DataSet,


(3)创建DataSet方式三

通过DataFrame转换成DataSet

scala> val dataSet2 = rdd1.toDF.toJSON
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]

通过DataFrame转换成DataSet,查看数据发现每一行是json的格式

scala> dataSet2.show
+--------------------+
|               value|
+--------------------+
|{"value":"java py...|
|{"value":"spring ...|
|{"value":"hbase s...|
|{"value":"scala p...|
|{"value":"linux u...|
|{"value":"mysql h...|
+--------------------+

可以再通过这种方式将DataSet转换为DataFrame

scala> spark.read.json(dataSet2)
res23: org.apache.spark.sql.DataFrame = [value: string]
scala> spark.read.json(dataSet2).show
+--------------------+
|               value|
+--------------------+
|  java python hadoop|
|spring python had...|
|   hbase spark scala|
|  scala python java |
| linux unix java php|
|      mysql hive hue|
+--------------------+

还有一种方式可以通过DataFrame转换成DataSet

scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
scala> val df = rdd1.flatMap(x => x.split(" ")).toDF
df: org.apache.spark.sql.DataFrame = [value: string]
scala> val dataSet = df.as[String]
dataSet: org.apache.spark.sql.Dataset[String] = [value: string]
scala> dataSet.map(x => (x,1)).show
+------+---+
|    _1| _2|
+------+---+
|  java|  1|
|python|  1|
|hadoop|  1|
|spring|  1|
|python|  1|
|hadoop|  1|
|  java|  1|
|     c|  1|
|   c++|  1|
| hbase|  1|
| spark|  1|
| scala|  1|
| scala|  1|
|python|  1|
|  java|  1|
| linux|  1|
|  unix|  1|
|  java|  1|
|   php|  1|
| mysql|  1|
+------+---+
scala> dataSet.map(x => (x,1))
res28: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

此时我们发现它的列名是_1,_2,我们可以将它的列名改为我们自定义对象

scala> case class Person(username:String,usercount:Int)
defined class Person
scala> val df = rdd1.flatMap(x => x.split(" ")).map(x => (x,1)).toDF("username","usercount")
df: org.apache.spark.sql.DataFrame = [username: string, usercount: int]
scala> val dataSet = df.as[Person]
dataSet: org.apache.spark.sql.Dataset[Person] = [username: string, usercount: int]

这时列名就换过来了

scala> dataSet.show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        1|
|  python|        1|
|  hadoop|        1|
|  spring|        1|
|  python|        1|
|  hadoop|        1|
|    java|        1|
|       c|        1|
|     c++|        1|
|   hbase|        1|
|   spark|        1|
|   scala|        1|
|   scala|        1|
|  python|        1|
|    java|        1|
|   linux|        1|
|    unix|        1|
|    java|        1|
|     php|        1|
|   mysql|        1|
+--------+---------+

下面就可以对DataSet做一些操作

scala> dataSet.select("username","usercount").groupBy("username").count.show
+--------+-----+
|username|count|
+--------+-----+
|    unix|    1|
|   hbase|    1|
|  spring|    1|
|   mysql|    1|
|   scala|    2|
|   spark|    1|
|     hue|    1|
|     c++|    1|
|       c|    1|
|   linux|    1|
|    java|    4|
|     php|    1|
|  hadoop|    2|
|  python|    3|
|    hive|    1|
+--------+-----+


七、数据集之间的对比与转换(总结)


16.png17.png18.png19.png20.png21.png22.png23.png24.png

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
716 1
|
7天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
存储 分布式计算 数据处理
解释弹性分布式数据集(RDD)的概念
【8月更文挑战第13天】
129 4
|
3月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
80 0
|
6月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
187 0
|
6月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
155 0
|
6月前
|
机器学习/深度学习 负载均衡 PyTorch
PyTorch分布式训练:加速大规模数据集的处理
【4月更文挑战第18天】PyTorch分布式训练加速大规模数据集处理,通过数据并行和模型并行提升训练效率。`torch.distributed`提供底层IPC与同步,适合定制化需求;`DistributedDataParallel`则简化并行过程。实际应用注意数据划分、通信开销、负载均衡及错误处理。借助PyTorch分布式工具,可高效应对深度学习的计算挑战,未来潜力无限。
|
6月前
|
存储 缓存 分布式计算
Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
【2月更文挑战第13天】Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
73 1
|
6月前
|
分布式计算 大数据 数据处理
Spark RDD(弹性分布式数据集)
Spark RDD(弹性分布式数据集)
|
22天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?