Spark2.X弹性分布式数据集(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 笔记

五、 DataFrame创建方式及功能使用

在Spark中, DataFrame是一 种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有 名称和类型。

使得SparkSQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之.上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。

反观RDD ,由于无从得知所存数据元素的具体内部结构, Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame和RDD的对比图15.png

RDD转换DataFrame:

准备数据集

val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")
val allRdd = rdd1.union(rdd2)

将RDD转换为DataFrame,使用toDF

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

默认列名是 _1 _2

scala> lines.printSchema
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)

设置Schema为key value

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).toDF("key","value")
lines: org.apache.spark.sql.DataFrame = [key: string, value: int]
scala> lines.printSchema
root
 |-- key: string (nullable = true)
 |-- value: integer (nullable = false)

显示key

scala> lines.select("key").show
+------+
|   key|
+------+
|  java|
|python|
|hadoop|
| scala|
|spring|
|  hive|
|   php|
| linux|
|   hue|
|  unix|
| spark|
| hbase|
| mysql|
|   c++|
|     c|
+------+

DataSet转换DataFrame:

准备数据集

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1))


dataSet: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

查看DataSet的数据格式

scala> dataSet.show
+------+---+
|    _1| _2|
+------+---+
|  java|  1|
|python|  1|
|hadoop|  1|
|spring|  1|
|python|  1|
|hadoop|  1|
|  java|  1|
|     c|  1|
|   c++|  1|
| hbase|  1|
| spark|  1|
| scala|  1|
| scala|  1|
|python|  1|
|  java|  1|
| linux|  1|
|  unix|  1|
|  java|  1|
|   php|  1|
| mysql|  1|
+------+---+
only showing top 20 rows

将DataSet转化为DataFrame

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1)).toDF("key","v

这时已经转化为DataFrame的格式

dataSet: org.apache.spark.sql.DataFrame = [key: string, value: int]

DataFrame操作

dataSet.select("key","value").groupBy("key").count.sort($"count".desc).show


+------+-----+
|   key|count|
+------+-----+
|  java|    4|
|python|    3|
| scala|    2|
|hadoop|    2|
|  unix|    1|
|spring|    1|
| mysql|    1|
| spark|    1|
| linux|    1|
|   hue|    1|
|   c++|    1|
| hbase|    1|
|     c|    1|
|  hive|    1|
|   php|    1|
+------+-----+


六、DataSet创建方式及功能使用


DataSet与RDD相似,但是它们不是使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输。


DataSet创建方式有两种:一种是直接通过sparkSession对象读取外部数据创建,另一种是通过RDD转换。


(1)创建DataSet方式一

通过sparkSession对象读取外部数据创建DataSet:

val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")
dataSet: org.apache.spark.sql.Dataset[String] = [value: string]

查看Schema信息

scala> dataSet.printSchema
root
 |-- value: string (nullable = true)

查看一下数据

scala> val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").show
+--------------------+
|               value|
+--------------------+
|  java python hadoop|
|spring python had...|
|   hbase spark scala|
|  scala python java |
| linux unix java php|
|      mysql hive hue|
+--------------------+
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1))

对DataSet操作

scala> lines.select("_1","_2").groupBy("_1").count.show
+------+-----+
|    _1|count|
+------+-----+
|  unix|    1|
| hbase|    1|
|spring|    1|
| mysql|    1|
| scala|    2|
| spark|    1|
|   hue|    1|
|   c++|    1|
|     c|    1|
| linux|    1|
|  java|    4|
|   php|    1|
|hadoop|    2|
|python|    3|
|  hive|    1|
+------+-----+

自定义对象的映射

case class Person(username:String,usercount:Int)
defined class Person
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => Person(x._1,x._2))

这时列名就改成了刚才我们自定义的

scala> lines.show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        1|
|  python|        1|
|  hadoop|        1|
|  spring|        1|
|  python|        1|
|  hadoop|        1|
|    java|        1|
|       c|        1|
|     c++|        1|
|   hbase|        1|
|   spark|        1|
|   scala|        1|
|   scala|        1|
|  python|        1|
|    java|        1|
|   linux|        1|
|    unix|        1|
|    java|        1|
|     php|        1|
|   mysql|        1|
+--------+---------+
only showing top 20 rows
scala> lines.select("username","usercount").groupBy("username").count.show
+--------+-----+
|username|count|
+--------+-----+
|    unix|    1|
|   hbase|    1|
|  spring|    1|
|   mysql|    1|
|   scala|    2|
|   spark|    1|
|     hue|    1|
|     c++|    1|
|       c|    1|
|   linux|    1|
|    java|    4|
|     php|    1|
|  hadoop|    2|
|  python|    3|
|    hive|    1|
+--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).show
+--------+-----+
|username|count|
+--------+-----+
|    java|    4|
|  python|    3|
|   scala|    2|
|  hadoop|    2|
|   hbase|    1|
|     hue|    1|
|    unix|    1|
|     c++|    1|
|  spring|    1|
|   mysql|    1|
|   spark|    1|
|       c|    1|
|   linux|    1|
|     php|    1|
|    hive|    1|
+--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).toDF("username","usercount").show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        4|
|  python|        3|
|   scala|        2|
|  hadoop|        2|
|   hbase|        1|
|  spring|        1|
|   spark|        1|
|       c|        1|
|     hue|        1|
|     c++|        1|
|   mysql|        1|
|   linux|        1|
|     php|        1|
|    unix|        1|
|    hive|        1|
+--------+---------+


(2)创建DataSet方式二

通过RDD转换成DataSet

scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
rdd1: org.apache.spark.rdd.RDD[String] = file:///opt/datas/stu.txt MapPartitionsRDD[187] at textFile at <console>:24
scala> val dataSet2 = rdd1.toDS
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]

这时就将RDD转换成了DataSet,


(3)创建DataSet方式三

通过DataFrame转换成DataSet

scala> val dataSet2 = rdd1.toDF.toJSON
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]

通过DataFrame转换成DataSet,查看数据发现每一行是json的格式

scala> dataSet2.show
+--------------------+
|               value|
+--------------------+
|{"value":"java py...|
|{"value":"spring ...|
|{"value":"hbase s...|
|{"value":"scala p...|
|{"value":"linux u...|
|{"value":"mysql h...|
+--------------------+

可以再通过这种方式将DataSet转换为DataFrame

scala> spark.read.json(dataSet2)
res23: org.apache.spark.sql.DataFrame = [value: string]
scala> spark.read.json(dataSet2).show
+--------------------+
|               value|
+--------------------+
|  java python hadoop|
|spring python had...|
|   hbase spark scala|
|  scala python java |
| linux unix java php|
|      mysql hive hue|
+--------------------+

还有一种方式可以通过DataFrame转换成DataSet

scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
scala> val df = rdd1.flatMap(x => x.split(" ")).toDF
df: org.apache.spark.sql.DataFrame = [value: string]
scala> val dataSet = df.as[String]
dataSet: org.apache.spark.sql.Dataset[String] = [value: string]
scala> dataSet.map(x => (x,1)).show
+------+---+
|    _1| _2|
+------+---+
|  java|  1|
|python|  1|
|hadoop|  1|
|spring|  1|
|python|  1|
|hadoop|  1|
|  java|  1|
|     c|  1|
|   c++|  1|
| hbase|  1|
| spark|  1|
| scala|  1|
| scala|  1|
|python|  1|
|  java|  1|
| linux|  1|
|  unix|  1|
|  java|  1|
|   php|  1|
| mysql|  1|
+------+---+
scala> dataSet.map(x => (x,1))
res28: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

此时我们发现它的列名是_1,_2,我们可以将它的列名改为我们自定义对象

scala> case class Person(username:String,usercount:Int)
defined class Person
scala> val df = rdd1.flatMap(x => x.split(" ")).map(x => (x,1)).toDF("username","usercount")
df: org.apache.spark.sql.DataFrame = [username: string, usercount: int]
scala> val dataSet = df.as[Person]
dataSet: org.apache.spark.sql.Dataset[Person] = [username: string, usercount: int]

这时列名就换过来了

scala> dataSet.show
+--------+---------+
|username|usercount|
+--------+---------+
|    java|        1|
|  python|        1|
|  hadoop|        1|
|  spring|        1|
|  python|        1|
|  hadoop|        1|
|    java|        1|
|       c|        1|
|     c++|        1|
|   hbase|        1|
|   spark|        1|
|   scala|        1|
|   scala|        1|
|  python|        1|
|    java|        1|
|   linux|        1|
|    unix|        1|
|    java|        1|
|     php|        1|
|   mysql|        1|
+--------+---------+

下面就可以对DataSet做一些操作

scala> dataSet.select("username","usercount").groupBy("username").count.show
+--------+-----+
|username|count|
+--------+-----+
|    unix|    1|
|   hbase|    1|
|  spring|    1|
|   mysql|    1|
|   scala|    2|
|   spark|    1|
|     hue|    1|
|     c++|    1|
|       c|    1|
|   linux|    1|
|    java|    4|
|     php|    1|
|  hadoop|    2|
|  python|    3|
|    hive|    1|
+--------+-----+


七、数据集之间的对比与转换(总结)


16.png17.png18.png19.png20.png21.png22.png23.png24.png

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
691 1
|
2月前
|
存储 分布式计算 数据处理
解释弹性分布式数据集(RDD)的概念
【8月更文挑战第13天】
77 4
|
2月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
49 0
|
5月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
164 0
|
5月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
120 0
|
5月前
|
机器学习/深度学习 负载均衡 PyTorch
PyTorch分布式训练:加速大规模数据集的处理
【4月更文挑战第18天】PyTorch分布式训练加速大规模数据集处理,通过数据并行和模型并行提升训练效率。`torch.distributed`提供底层IPC与同步,适合定制化需求;`DistributedDataParallel`则简化并行过程。实际应用注意数据划分、通信开销、负载均衡及错误处理。借助PyTorch分布式工具,可高效应对深度学习的计算挑战,未来潜力无限。
|
5月前
|
存储 缓存 分布式计算
Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
【2月更文挑战第13天】Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
69 1
|
5月前
|
分布式计算 大数据 数据处理
Spark RDD(弹性分布式数据集)
Spark RDD(弹性分布式数据集)
|
2月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
73 2
基于Redis的高可用分布式锁——RedLock
|
2月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
下一篇
无影云桌面