他们是否有可能在Scala DF中展平阵列?
正如我所知,使用列并选择filed.a可行,但我不想手动指定它们。
df.printSchema()
|-- client_version: string (nullable = true)
|-- filed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
| | |-- d: string (nullable = true)
最后的df
df.printSchema()
|-- client_version: string (nullable = true)
|-- filed_a: string (nullable = true)
|-- filed_b: string (nullable = true)
|-- filed_c: string (nullable = true)
|-- filed_d: string (nullable = true)
您可以使用blast平展ArrayType列,并将嵌套的结构元素名称映射到所需的顶级列名,如下所示:
import org.apache.spark.sql.functions._
case class S(a: String, b: String, c: String, d: String)
val df = Seq(
("1.0", Seq(S("a1", "b1", "c1", "d1"))),
("2.0", Seq(S("a2", "b2", "c2", "d2"), S("a3", "b3", "c3", "d3")))
).toDF("client_version", "filed")
df.printSchema
// root
// |-- client_version: string (nullable = true)
// |-- filed: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- a: string (nullable = true)
// | | |-- b: string (nullable = true)
// | | |-- c: string (nullable = true)
// | | |-- d: string (nullable = true)
val dfFlattened = df.withColumn("filed_element", explode($"filed"))
val structElements = dfFlattened.select($"filed_element.*").columns
val dfResult = dfFlattened.select( col("client_version") +: structElements.map(
c => col(s"filed_element.$c").as(s"filed_$c")
): _*
)
dfResult.show
// +--------------+-------+-------+-------+-------+
// |client_version|filed_a|filed_b|filed_c|filed_d|
// +--------------+-------+-------+-------+-------+
// | 1.0| a1| b1| c1| d1|
// | 2.0| a2| b2| c2| d2|
// | 2.0| a3| b3| c3| d3|
// +--------------+-------+-------+-------+-------+
dfResult.printSchema
// root
// |-- client_version: string (nullable = true)
// |-- filed_a: string (nullable = true)
// |-- filed_b: string (nullable = true)
// |-- filed_c: string (nullable = true)
// |-- filed_d: string (nullable = true)
用于explode通过添加更多行来展平数组,然后select使用*符号将struct列重新置于顶部。
import org.apache.spark.sql.functions.{collect_list, explode, struct}
import spark.implicits._
val df = Seq(("1", "a", "a", "a"),
("1", "b", "b", "b"),
("2", "a", "a", "a"),
("2", "b", "b", "b"),
("2", "c", "c", "c"),
("3", "a", "a","a")).toDF("idx", "A", "B", "C")
.groupBy(("idx"))
.agg(collect_list(struct("A", "B", "C")).as("nested_col"))
df.printSchema()
// root
// |-- idx: string (nullable = true)
// |-- nested_col: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- A: string (nullable = true)
// | | |-- B: string (nullable = true)
// | | |-- C: string (nullable = true)
df.show
// +---+--------------------+
// |idx| nested_col|
// +---+--------------------+
// | 3| [[a, a, a]]|
// | 1|[[a, a, a], [b, b...|
// | 2|[[a, a, a], [b, b...|
// +---+--------------------+
val dfExploded = df.withColumn("exploded", explode($"nested_col")).drop("nested_col")
dfExploded.show
// +---+---------+
// |idx| exploded|
// +---+---------+
// | 3|[a, a, a]|
// | 1|[a, a, a]|
// | 1|[b, b, b]|
// | 2|[a, a, a]|
// | 2|[b, b, b]|
// | 2|[c, c, c]|
// +---+---------+
val finalDF = dfExploded.select("idx", "exploded.*")
finalDF.show
// +---+---+---+---+
// |idx| A| B| C|
// +---+---+---+---+
// | 3| a| a| a|
// | 1| a| a| a|
// | 1| b| b| b|
// | 2| a| a| a|
// | 2| b| b| b|
// | 2| c| c| c|
// +---+---+---+---+
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。