【Spark】Spark Dataframe 对项目中的数据实现列转行操作

简介: 【Spark】Spark Dataframe 对项目中的数据实现列转行操作

文章目录


一、数据源

二、首先考虑单独两行映射

三、同理将其余隔行依次映射

四、数据进行拼接


一、数据源


转换之前先看下数据结构


多行存在空值需要过滤,不同的状态(yes、maybe、invited、no)存在多个值,需要转换成(events userid status)的状态


val df = spark.read.format("csv").option("header","true").load("file:///opt/data/event_attendees.csv")
scala> df.printSchema
root
 |-- event: string (nullable = true)
 |-- yes: string (nullable = true)
 |-- maybe: string (nullable = true)
 |-- invited: string (nullable = true)
 |-- no: string (nullable = true

image.png


二、首先考虑单独两行映射


df.filter(col("yes").isNotNull).select(col("event"),col("yes")).withColumn("userid",explode(split(col("yes")," "))).drop($"yes").withColumn("status",lit("yes")).show(3)
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|1975964455|   yes|
|1159822043| 252302513|   yes|
|1159822043|4226086795|   yes|
+----------+----------+------+
only showing top 3 rows


三、同理将其余隔行依次映射


scala> val no = df.filter(col("no").isNotNull).select(col("event"),col("no")).withColumn("userid",explode(split(col("no")," "))).drop($"no").withColumn("status",lit("no"))
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|3575574655|    no|
|1159822043|1077296663|    no|
|1186208412|1728988561|    no|
+----------+----------+------+
only showing top 3 rows
no: Unit = ()
scala> val invited = df.filter(col("invited").isNotNull).select(col("event"),col("invited")).withColumn("userid",explode(split(col("invited")," "))).drop($"invited").withColumn("status",lit("invited")).show(3)
+----------+----------+-------+
|     event|    userid| status|
+----------+----------+-------+
|1159822043|1723091036|invited|
|1159822043|3795873583|invited|
|1159822043|4109144917|invited|
+----------+----------+-------+
only showing top 3 rows
invited: Unit = ()
scala> val maybe = df.filter(col("maybe").isNotNull).select(col("event"),col("maybe")).withColumn("userid",explode(split(col("maybe")," "))).drop($"maybe").withColumn("status",lit("maybe")).show(3)
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|2733420590| maybe|
|1159822043| 517546982| maybe|
|1159822043|1350834692| maybe|
+----------+----------+------+
only showing top 3 rows
maybe: Unit = ()


四、数据进行拼接


scala> yes.union(no).union(maybe).union(invited).show()
+----------+----------+------+
|     event|    userid|status|
+----------+----------+------+
|1159822043|1975964455|   yes|
|1159822043| 252302513|   yes|
|1159822043|4226086795|   yes|
|1159822043|3805886383|   yes|
|1159822043|1420484491|   yes|
|1159822043|3831921392|   yes|
|1159822043|3973364512|   yes|
| 686467261|2394228942|   yes|
| 686467261|2686116898|   yes|
| 686467261|1056558062|   yes|
| 686467261|3792942231|   yes|
| 686467261|4143738190|   yes|
| 686467261|3422491949|   yes|
| 686467261|  96269957|   yes|
| 686467261| 643111206|   yes|
| 686467261|1267277110|   yes|
| 686467261|1602677715|   yes|
| 686467261| 175167653|   yes|
| 855842686|2406118796|   yes|
| 855842686|3550897984|   yes|
+----------+----------+------+
only showing top 20 rows
scala> yes.union(no).union(maybe).union(invited).count()
res15: Long = 11245010


目录
相关文章
|
8天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8天前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
11天前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
17 1
|
1月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
100 1
|
1月前
|
分布式计算 DataWorks MaxCompute
DataWorks操作报错合集之spark操作odps,写入时报错,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。