记一次SparkSql的union操作异常

简介: 记一次SparkSql的union操作异常
在某次使用sparksql中的union合并两个DataFrame时,发现总是报类型不匹配的错误,但是检查后发现两个DataFrame中无论是列名和列的类型,都是完全相同的,下面复现一下这个错误
object SqlTest {
  def main(args: Array[String]): Unit = {
    // 设置日志输出的级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    //初始化编程入口
    val session:SparkSession = SparkSession.builder.appName("name").master("local[2]").getOrCreate()
    import session.implicits._
    //创建第一个DataFrame
    var df1 = List[(String,String,Integer)](
      ("李奇峰","2019-5-12",88),
      ("李奇峰","2019-5-12",81),
      ("李奇峰","2019-5-12",82),
      ("李奇峰","2019-5-12",86)
    ).toDF("name","date","grade")
    println("df1的显示结果:")
    df1.show()
    //创建第二个DataFrame
    var df2 = List[(String,Integer,String)](
      ("李晓峰",88,"2019-5-12"),
      ("李晓峰",81,"2019-5-12"),
      ("李晓峰",82,"2019-5-12"),
      ("李晓峰",86,"2019-5-12")
    ).toDF("name","grade","date")
    println("df2的显示结果:")
    df2.show()
    //将两个DataFrame的date列转换为时间类型
    df1 = df1.withColumn("date",$"date".cast(DateType))
    df2 = df2.withColumn("date",$"date".cast(DateType))
    //进行合并操作
    val result = df1.union(df2)
    println("合并后的显示结果:")
    result.show()
  }
}

执行以上方法会存在一下报错

df1的显示结果:
+------+---------+-----+
|  name|     date|grade|
+------+---------+-----+
|李奇峰|2019-5-12|   88|
|李奇峰|2019-5-12|   81|
|李奇峰|2019-5-12|   82|
|李奇峰|2019-5-12|   86|
+------+---------+-----+

df2的显示结果:
+------+-----+---------+
|  name|grade|     date|
+------+-----+---------+
|李晓峰|   88|2019-5-12|
|李晓峰|   81|2019-5-12|
|李晓峰|   82|2019-5-12|
|李晓峰|   86|2019-5-12|
+------+-----+---------+

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. int <> date at the second column of the second table;;
'Union下·
:- Project [name#7, cast(date#8 as date) AS date#26, grade#9]
:  +- Project [_1#3 AS name#7, _2#4 AS date#8, _3#5 AS grade#9]
:     +- LocalRelation [_1#3, _2#4, _3#5]
+- Project [name#20, course#21, cast(date#22 as date) AS date#30]
   +- Project [_1#16 AS name#20, _2#17 AS course#21, _3#18 AS date#22]
      +- LocalRelation [_1#16, _2#17, _3#18]

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$12$$anonfun$apply$13.apply(CheckAnalysis.scala:293)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$12$$anonfun$apply$13.apply(CheckAnalysis.scala:290)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$12.apply(CheckAnalysis.scala:290)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$12.apply(CheckAnalysis.scala:279)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:279)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
    at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:3419)
    at org.apache.spark.sql.Dataset.union(Dataset.scala:1857)
    at SqlTest$.main(SqlTest.scala:34)
    at SqlTest.main(SqlTest.scala)

从报错可以看出是两个DataFrame中存在类型不兼容的问题,可是经过检查可以看到两个DataFrame中类型与列名都是完全相同的,无奈去查找Spark官网的Union,发现两个细节:

1. union操作和集合的并集并不等价,它不会去除重复数据。

2. union函数并不是按照列名合并,而是按照位置合并。即DataFrame的列名可以不相同,但对应位置的列将合并在一起。

根据以上第二个细节去检查代码,可以看出虽然两个DataFrame的列名和列对应的类型虽然都是相同的,但是他们之间的位置并不是相同的,所以要修改其中一个DataFrame的列的位置
修改后的代码如下:

object SqlTest {
  def main(args: Array[String]): Unit = {
    // 设置日志输出的级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    //初始化编程入口
    val session:SparkSession = SparkSession.builder.appName("name").master("local[2]").getOrCreate()
    import session.implicits._
    //创建第一个DataFrame
    var df1 = List[(String,String,Integer)](
      ("李奇峰","2019-5-12",88),
      ("李奇峰","2019-5-12",81),
      ("李奇峰","2019-5-12",82),
      ("李奇峰","2019-5-12",86)
    ).toDF("name","date","grade")
    //创建第二个DataFrame
    var df2 = List[(String,Integer,String)](
      ("李晓峰",88,"2019-5-12"),
      ("李晓峰",81,"2019-5-12"),
      ("李晓峰",82,"2019-5-12"),
      ("李晓峰",86,"2019-5-12")
    ).toDF("name","grade","date")
    //将两个DataFrame的date列转换为时间类型
    df1 = df1.withColumn("date",$"date".cast(DateType))
    df2 = df2.withColumn("date",$"date".cast(DateType))
    //修改df2的列的位置
    df2 = df2.select("name","date","grade")
    //进行合并操作
    df1.union(df2).show()
  }
}

可以看出上述代码比之前的代码只多了一行调整列的位置的代码
df2 = df2.select("name","date","grade")
结果如下:

df1的显示结果:
+------+---------+-----+
|  name|     date|grade|
+------+---------+-----+
|李奇峰|2019-5-12|   88|
|李奇峰|2019-5-12|   81|
|李奇峰|2019-5-12|   82|
|李奇峰|2019-5-12|   86|
+------+---------+-----+

df2的显示结果:
+------+-----+---------+
|  name|grade|     date|
+------+-----+---------+
|李晓峰|   88|2019-5-12|
|李晓峰|   81|2019-5-12|
|李晓峰|   82|2019-5-12|
|李晓峰|   86|2019-5-12|
+------+-----+---------+

合并后的显示结果:
+------+----------+-----+
|  name|      date|grade|
+------+----------+-----+
|李奇峰|2019-05-12|   88|
|李奇峰|2019-05-12|   81|
|李奇峰|2019-05-12|   82|
|李奇峰|2019-05-12|   86|
|李晓峰|2019-05-12|   88|
|李晓峰|2019-05-12|   81|
|李晓峰|2019-05-12|   82|
|李晓峰|2019-05-12|   86|
+------+----------+-----+
目录
相关文章
|
5月前
|
监控 前端开发 SQL
ODPS SQL问题之在何种情况下建议使用Distributed Map Join
ODPS SQL问题之在何种情况下建议使用Distributed Map Join
|
7月前
|
SQL 数据处理 HIVE
【Hive】写出Hive中split、coalesce及collect_list函数的用法?
【4月更文挑战第17天】【Hive】写出Hive中split、coalesce及collect_list函数的用法?
|
7月前
|
SQL 分布式计算 数据处理
【Hive】sort by 和 order by 的区别
【4月更文挑战第15天】【Hive】sort by 和 order by 的区别
|
7月前
|
SQL 分布式计算 Spark
Spark【Spark SQL(四)UDF函数和UDAF函数】
Spark【Spark SQL(四)UDF函数和UDAF函数】
|
分布式计算 Spark
spark full outer join 数据倾斜导致OOM
spark full outer join 数据倾斜导致OOM
101 0
|
SQL 数据库
SQL 语句中 left join 后用 on 还是 where,区别大了!
后来发现 join on and 不会过滤结果记录条数,只会根据and后的条件是否显示 B表的记录,A表的记录一定会显示。 不管and 后面的是A.id=1还是B
SQL 语句中 left join 后用 on 还是 where,区别大了!
|
SQL 分布式计算 Spark
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
556 0
|
SQL 分布式计算 Spark
spark CTAS union all (union all的个数很多)导致超过spark.driver.maxResultSize配置(1G)
spark CTAS union all (union all的个数很多)导致超过spark.driver.maxResultSize配置(1G)
897 0