Delta Lake 分区表覆盖写入操作

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Delta Lake当前版本(0.5)只支持API操作的,但是实现 Insert SQL 语法也不难,需要注意的是 Delta Lake 中的分区表覆盖写入操作。

01 覆盖写

INSERT OVERWRITE TABLE 分区表时分三种场景:

  • 动态分区 - 写入前会删除所有分区,并根据数据中的分区字段写入相应新分区
  • 静态分区 - 只会对指定的分区进行覆盖写操作
  • 混合分区(动态+静态分区) - 上述两种情况的结合

如果想通过 SQL 转化为上述 API ,首先需要在 sql parser 的时候获取到 insertMode 和 partitions 信息,并将 partitions 信息存在一个有序的结构中,例如 LinkedHashMap。然后利用这些信息,就可以拼装进行拼装实现上述三种场景。

1.1 动态分区

对所有 ds 分区进行覆盖写操作,将会清空所有 ds 分区

sql

INSERT OVERWRITE TABLE db.tableA partition(ds) 
  select name,ds from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite").partitionBy(ds)

1.2 静态分区

对 ds=20200101 的分区进行覆盖写操作,Delta 不能直接将数据写入分区目录

sql

INSERT OVERWRITE TABLE db.tableA partition(ds=20200101) 
  select name from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 20200101").partitionBy(ds)

1.3 混合分区

对 ds=20200101 中的所有 event 的分区进行覆盖写操作,将会清空所有 event 分区
sql

INSERT OVERWRITE TABLE db.tableA partition(ds=20200101,event) 
  select name,event from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 2020010

02 ​后记

  • 分区操作,一定要保证 partition 信息的有序
  • 新表需要从 hive metastore 中获取 partition 信息,Delta Table 在第一次写入数据前,是不会生成 _DELTA_LOG 目录的,此时可以从 hive metastore 中获取建表时的分区名和其对应的类型,例如:
//ddl: `ds` INT COMMENT 'ds'
val ddl = spark.sharedState.externalCatalog.getTable(dbName, tableName).partitionSchema.toDDL
val partitionNameAndType = new mutable.HashMap[String, String]()

ddl.split(",").foreach { r =>
  val x = r.split(" ")
  partitionNameAndType.put(x(0).replace("`", ""), x(1))
}

-语义不同

Hive Table 直接使用 insert overwrite 动态分区只会覆盖数据涉及到的分区,而 Spark 和 Delta Lake 的 API 则会将所有所有分区进行覆盖。Spark 2.3 以后也可以通过下述API实现 Hive insert overwrite 语义

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

-动态分区覆盖写是高危操作

该操作很有可能会删除一些你不期望的数据,所以 Delta Lake 目前的 API 提供了 replaceWhere option 进行约束


原文链接


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png
Spark技术交流社区公众号,微信扫一扫关注image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
存储 NoSQL 关系型数据库
MPP架构数据仓库使用问题之Visibility bitmap表被删除的文件信息是如何记录的
MPP架构数据仓库使用问题之Visibility bitmap表被删除的文件信息是如何记录的
|
6月前
|
SQL 资源调度 Oracle
实时计算 Flink版产品使用合集之是否支持通过 TRUNCATE TABLE 语句来截断(truncate)表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL
云架构数据倾斜问题之在SQL数据源读取查询时合并小文件如何解决
云架构数据倾斜问题之在SQL数据源读取查询时合并小文件如何解决
|
5月前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 分布式计算 Java
实时数仓 Hologres产品使用合集之ologres holostudio为什么不支持max_pt('table')取最大分区这个方法
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
6月前
|
SQL 分布式计算 DataWorks
实时数仓 Hologres产品使用合集之查询分区表的生命周期(即之前设置的'auto_partitioning.num_retention'值)的SQL语句,可以使用什么查询
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
107 0
|
6月前
|
分布式计算 测试技术 Apache
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
Apache Hudi vs Delta Lake:透明TPC-DS Lakehouse性能基准
117 4
|
6月前
|
JavaScript 数据格式
产品说这个 table 这里数据需要合并
产品说这个 table 这里数据需要合并
65 0
|
存储 分布式计算 Cloud Native
Hologres揭秘:优化COPY,批量导入性能提升5倍+
揭秘Hologres优化COPY的技术原理,实现批量导入性能提升5倍+。
3870 0
Hologres揭秘:优化COPY,批量导入性能提升5倍+