01 覆盖写
INSERT OVERWRITE TABLE 分区表时分三种场景:
- 动态分区 - 写入前会删除所有分区,并根据数据中的分区字段写入相应新分区
- 静态分区 - 只会对指定的分区进行覆盖写操作
- 混合分区(动态+静态分区) - 上述两种情况的结合
如果想通过 SQL 转化为上述 API ,首先需要在 sql parser 的时候获取到 insertMode 和 partitions 信息,并将 partitions 信息存在一个有序的结构中,例如 LinkedHashMap。然后利用这些信息,就可以拼装进行拼装实现上述三种场景。
1.1 动态分区
对所有 ds 分区进行覆盖写操作,将会清空所有 ds 分区
sql
INSERT OVERWRITE TABLE db.tableA partition(ds)
select name,ds from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite").partitionBy(ds)
1.2 静态分区
对 ds=20200101 的分区进行覆盖写操作,Delta 不能直接将数据写入分区目录
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101)
select name from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 20200101").partitionBy(ds)
1.3 混合分区
对 ds=20200101 中的所有 event 的分区进行覆盖写操作,将会清空所有 event 分区
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101,event)
select name,event from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 2020010
02 后记
- 分区操作,一定要保证 partition 信息的有序
- 新表需要从 hive metastore 中获取 partition 信息,Delta Table 在第一次写入数据前,是不会生成 _DELTA_LOG 目录的,此时可以从 hive metastore 中获取建表时的分区名和其对应的类型,例如:
//ddl: `ds` INT COMMENT 'ds'
val ddl = spark.sharedState.externalCatalog.getTable(dbName, tableName).partitionSchema.toDDL
val partitionNameAndType = new mutable.HashMap[String, String]()
ddl.split(",").foreach { r =>
val x = r.split(" ")
partitionNameAndType.put(x(0).replace("`", ""), x(1))
}
-语义不同
Hive Table 直接使用 insert overwrite 动态分区只会覆盖数据涉及到的分区,而 Spark 和 Delta Lake 的 API 则会将所有所有分区进行覆盖。Spark 2.3 以后也可以通过下述API实现 Hive insert overwrite 语义
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
-动态分区覆盖写是高危操作
该操作很有可能会删除一些你不期望的数据,所以 Delta Lake 目前的 API 提供了 replaceWhere option 进行约束
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
Spark技术交流社区公众号,微信扫一扫关注