【Spark】Spark Dataframe 对项目中的数据实现列转行操作

简介: 【Spark】Spark Dataframe 对项目中的数据实现列转行操作

文章目录


一、数据源

二、首先考虑单独两行映射

三、同理将其余隔行依次映射

四、数据进行拼接


一、数据源


转换之前先看下数据结构


多行存在空值需要过滤,不同的状态(yes、maybe、invited、no)存在多个值,需要转换成(events userid status)的状态


val df = spark.read.format("csv").option("header","true").load("file:///opt/data/event_attendees.csv")
scala> df.printSchema
root
 |-- event: string (nullable = true)
 |-- yes: string (nullable = true)
 |-- maybe: string (nullable = true)
 |-- invited: string (nullable = true)
 |-- no: string (nullable = true

image.png


二、首先考虑单独两行映射


df.filter(col("yes").isNotNull).select(col("event"),col("yes")).withColumn("userid",explode(split(col("yes")," "))).drop($"yes").withColumn("status",lit("yes")).show(3)
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|1975964455|   yes|
|1159822043| 252302513|   yes|
|1159822043|4226086795|   yes|
+----------+----------+------+
only showing top 3 rows


三、同理将其余隔行依次映射


scala> val no = df.filter(col("no").isNotNull).select(col("event"),col("no")).withColumn("userid",explode(split(col("no")," "))).drop($"no").withColumn("status",lit("no"))
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|3575574655|    no|
|1159822043|1077296663|    no|
|1186208412|1728988561|    no|
+----------+----------+------+
only showing top 3 rows
no: Unit = ()
scala> val invited = df.filter(col("invited").isNotNull).select(col("event"),col("invited")).withColumn("userid",explode(split(col("invited")," "))).drop($"invited").withColumn("status",lit("invited")).show(3)
+----------+----------+-------+
|     event|    userid| status|
+----------+----------+-------+
|1159822043|1723091036|invited|
|1159822043|3795873583|invited|
|1159822043|4109144917|invited|
+----------+----------+-------+
only showing top 3 rows
invited: Unit = ()
scala> val maybe = df.filter(col("maybe").isNotNull).select(col("event"),col("maybe")).withColumn("userid",explode(split(col("maybe")," "))).drop($"maybe").withColumn("status",lit("maybe")).show(3)
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|2733420590| maybe|
|1159822043| 517546982| maybe|
|1159822043|1350834692| maybe|
+----------+----------+------+
only showing top 3 rows
maybe: Unit = ()


四、数据进行拼接


scala> yes.union(no).union(maybe).union(invited).show()
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|1975964455|   yes|
|1159822043| 252302513|   yes|
|1159822043|4226086795|   yes|
|1159822043|3805886383|   yes|
|1159822043|1420484491|   yes|
|1159822043|3831921392|   yes|
|1159822043|3973364512|   yes|
| 686467261|2394228942|   yes|
| 686467261|2686116898|   yes|
| 686467261|1056558062|   yes|
| 686467261|3792942231|   yes|
| 686467261|4143738190|   yes|
| 686467261|3422491949|   yes|
| 686467261|  96269957|   yes|
| 686467261| 643111206|   yes|
| 686467261|1267277110|   yes|
| 686467261|1602677715|   yes|
| 686467261| 175167653|   yes|
| 855842686|2406118796|   yes|
| 855842686|3550897984|   yes|
+----------+----------+------+
only showing top 20 rows
scala> yes.union(no).union(maybe).union(invited).count()
res15: Long = 11245010


目录
相关文章
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
17天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
57 0
|
4月前
|
存储 分布式计算 Java
|
4月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
251 4
|
4月前
|
存储 缓存 分布式计算
|
4月前
|
SQL 存储 分布式计算
|
4月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
51 1
|
监控 大数据 Windows
Spark-项目中分析日志的核心代码
代码 LogRecord 类: case class LogRecord ( clientIpAddress: String, rfc1413ClientIdentity: String, remoteUser: S...
1108 0
|
26天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
80 2
ClickHouse与大数据生态集成:Spark & Flink 实战

热门文章

最新文章