Delta Lake 平台化实践(离线篇)

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文是在 Delta Lake 0.4 与 Spark 2.4 集成、平台化过程中的一些实践与思考

原文链接:https://blog.csdn.net/lsshlsw/article/details/103553289

博客主:breeze_lsw


01

SQL 支持

1.1 DML

背景

delta lake 0.4 只支持以 api 的方式使用 Delete/Update/Merge Into 等 DML,对习惯了使用 sql 的终端用户会增加其学习使用成本。

解决方式

下文通过 spark sql extension 以插件化的方式扩展 sql parser ,增加 DML 语法的支持。在 spark 推出 sql extension 功能前,也可以用通过 aspectj 通过拦截 sql 的方式实现增加自定义语法的功能。

1.在自定义扩展 g4 文件中相应的 antlr4 DML 语法,部分参考了 databricks 商业版的语法

statement
    : DELETE FROM table=qualifiedName tableAlias
        (WHERE where=booleanExpression)?                              #deleteFromTable
    | UPDATE table=qualifiedName tableAlias upset=setClause
        (WHERE where=booleanExpression)?                              #updateTable
    | MERGE INTO target=qualifiedName targetAlias=tableAlias
        USING (source=qualifiedName |
            '(' sourceQuery=query')') sourceAlias=tableAlias
            ON mergeCondition=booleanExpression
            matchedClause*
            notMatchedClause*                                               #mergeIntoTable

2.实现对应的 visit,将 sql 翻译为 delta api,以最简单的 delete 为例

override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
    DeleteTableCommand(
        visitTableIdentifier(ctx.table),
         Option(getText(ctx.where)))
}

case class DeleteTableCommand(table: TableIdentifier,
                              where: Option[String]) extends RunnableCommand {
     override def run(sparkSession: SparkSession): Seq[Row] = {
       DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
       val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
       if (where.isEmpty) {
         deltaTable.delete()
       } else {
         deltaTable.delete(where.get)
       }
       Seq.empty[Row]
     }
}

3.启动 Spark 时加载打包的 extension jar ,初始化 SparkSession 时指定 Extension 类。

val spark = SparkSession.builder
    .enableHiveSupport()
    .config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")

tip

spark 3 之前不支持配置多个 extension ,如果遇到使用多个 extension 的情况,可以将多个 extension 在一个 extension 代码中进行注入。
以同时增加 tispark extension 和 自定义 extension 为例

override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser(TiParser(getOrCreateTiContext))
    extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
    extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
    extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
    extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}

1.2 Query

识别 delta table 有三种实现方式

  1. 使用相应表名前缀/后缀作为标识
  2. 在 table properties 中增加相应的参数进行识别
  3. 判断表目录下是否存在_delta_log

我们一开始是使用 delta_ 的前缀作为 delta 表名标识,这样实现最为简单,但是如果用户将 hive(parquet) 表转为 hive(delta) ,要是表名发生变化则需要修改相关代码,所以后面改为在table propertie 中增加相应的参数进行识别。
也可以通过判断是否存在 _delta_log 文件识别,该方式需要在建表时写入带有 schema 信息的空数据。
Query 通过对sql执行进行拦截,判断 Statement 为 SELECT 类型,然后将 delta 表的查询翻译成对应的 api 进行查询。

if (statementType == SELECT) {
    TableData tableData = (TableData) statementData.getStatement();
    sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}

1.3 Insert

Insert 需要考虑 INSERT_VALUES/INSERT_SELECT ,还有分区表/非分区表以及写入方式的一些情况。

sql 类型判断

  if (INSERT_SELECT == statementType) {
    isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
    isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}

INSERT_INTO 需要从 catalog 中获取对应的 schema 信息,并将 values 转化为 dataFrame

val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
    .map(col => col.name + " " + col.dataType)
    .collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)

INSERT_SELECT 则直接访问被解析过的 Delta Query 子句。

partition

由于 delta api 的限制,不支持静态分区,可以从 tableMeta 中解析到对应的动态分区名,使用 partitionBy 写入即可。
至此,已经实现使用 apache spark 2.4 使用 sql 直接操作 delta table 表。

02

平台化工作

与 hive metastore 的集成,表数据管理 等平台化的一些工作。

2.1 浏览 delta 数据

用户在平台上点击浏览数据,如果通过 delta api ,启动 spark job 的方式从 HDFS 读取数据,依赖重,延时高,用户体验差。
基于之前在 parquet 格式上的一些工作,浏览操作可以简化为找出 delta 事务日志中还存活 (add - remove) 的 parquet 文件进行读取,这样就避免了启动 spark 的过程,大多数情况能做到毫秒级返回数据。
需要注意的是,_delta_log 文件只存在父目录,浏览某个分区的数据同样需要浏览父目录获取相应分区内的存活文件。

// DeltaHelper.load 方法会从 _delta_log 目录中找到存活 parquet 文件,然后使用 ParquetFileReader 读取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
    inputFiles = DeltaHelper.load(dir, conf);
} else {
    inputFiles = getInputFilesFromDirectory(projectCode, dir);
}

从 delta 0.5 开始,浏览数据的功能可以通过 manifest 文件进行更简单的实现,具体内容可以参考下一篇文章。

2.2 浏览 delta 数据

将原生 delta lake 基于 path 的工作方式与 hive metastore 进行兼容。

数据写入/删除

数据动态分区插入 - 统计写入的分区信息(我们是通过修改了 spark write 部分的代码得到的写入分区信息),如果分区不存在则自动增加分区 add partition if ...。还有一种更简单的做法是直接使用 msck repair table ,但是这种方式在分区多的情况下,性能会非常糟糕。

删除分区 - 在界面上操作对某个分区进行删除时,后台调用 delta 删除api,并更新相关 partition 信息。

元数据信息更新

元数据中表/分区记录数,大小等元数据的更新支持。

2.3 碎片文件整理

  • 非 delta lake 表小文件整理方式可以参考我之前在 csdn 上的文章。这种方式采用的是在数据生成后校验,如果有碎片文件则进行同步合并,Spark 小文件合并优化实践
  • 非 delta lake 表的小文件整理使用的是同步模式,可能会影响到下有任务的启动时间。

基于 delta lake 的小文件整理要分为两块,存活数据和标记删除的数据

  1. 标记删除的数据
    被 delta 删除的数据,底层 parquet 文件依旧存在,只是在 delta_log 中做了标记,读取时跳过了该文件。

可以使用 delta 自带的 vacuum 功能删除一定时间之前标记删除的数据。

  1. 存活数据
    可以实现一个 compaction 功能,在后台定时做异步合并,由于 delta 支持事务管理的特性,该过程对用户透明,合并过程中保证了数据一致性且不会中断任务。

03

结语

3.1 一些限制

由于 delta api 的限制,目前 delta delete / update 不支持子句,可以使用 merge into 语法实现相同功能。
由于 delta api 的限制,只支持动态分区插入。

3.2 merge 使用场景

upsert

有 a1,a2 两张表,如果 a.1eventId = a2.eventId ,则 a2.data 会覆盖 a1.data,否则将 a2 表中相应的数据插入到 a1 表

MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
  UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)

ETL 避免数据重复场景

如果 uniqueid 只存在于 a2 表,则插入 a2 表中的相应记录

MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

维度表更新场景

  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 true ,则删除 a1 表相应记录
  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 false ,则将 a2 表相应记录的 value 更新到 a1 表中
  • 如果没有匹配到相应合作方,且 a2 中 deleted 为 fasle ,则将 a2 表相应记录插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)

历史数据清理场景

如果 a1 和 a2 表的合作方相同,则删除 a1 表中 ds < 20190101 的所有数据

MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
  DELETE

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。image.png
Apache Spark技术交流社区公众号,微信扫一扫关注image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
SQL 存储 JSON
基于 Delta Lake 构建数据湖仓体系
本文整理自阿里云开源大数据平台技术专家毕岩在7月17日阿里云数据湖技术专场交流会的分享。
基于 Delta Lake 构建数据湖仓体系
|
存储 SQL 机器学习/深度学习
Delta Lake 数据湖基础介绍(商业版)| 学习笔记
快速学习Delta Lake 数据湖基础介绍(商业版)
304 0
Delta Lake 数据湖基础介绍(商业版)| 学习笔记
|
存储 SQL 人工智能
如何使用 Delta Lake 构建批流一体数据仓库| 学习笔记
快速学习如何使用 Delta Lake 构建批流一体数据仓库
242 0
如何使用 Delta Lake 构建批流一体数据仓库| 学习笔记
|
SQL 存储 分布式计算
数据湖揭秘—Delta Lake
Delta Lake 是 DataBricks 公司开源的、用于构建湖仓架构的存储框架。能够支持 Spark,Flink,Hive,PrestoDB,Trino 等查询/计算引擎。作为一个开放格式的存储层,它在提供了批流一体的同时,为湖仓架构提供可靠的,安全的,高性能的保证。
4081 7
数据湖揭秘—Delta Lake
|
存储 SQL 人工智能
如何使用Delta Lake构建批流一体数据仓库【Databricks 数据洞察公开课】
Delta Lake是一个开源存储层,它为数据湖带来了可靠性。Delta Lake提供了ACID事务、可扩展的元数据处理,并统一了流式处理和批处理数据处理。Delta-Lake运行在现有数据湖之上,并且与Apache Spark API完全兼容。希望本篇能让大家更深入了解Delta Lake,最终可以实践到工作当中。
472 0
如何使用Delta Lake构建批流一体数据仓库【Databricks 数据洞察公开课】
|
存储 机器学习/深度学习 SQL
初探数据湖(Data Lake),到底有什么用?让我们来一窥究竟...
初探数据湖(Data Lake),到底有什么用?让我们来一窥究竟...
1350 0
初探数据湖(Data Lake),到底有什么用?让我们来一窥究竟...
|
存储 SQL JSON
Delta Lake基础介绍(开源版)【Databricks 数据洞察公开课】
针对社区版本Delta Lake提供的几大核心特性进行讲解,并通过示例演示如何使用这些特性。
880 0
Delta Lake基础介绍(开源版)【Databricks 数据洞察公开课】
|
存储 SQL 缓存
Delta Lake基础介绍(商业版)【Databricks 数据洞察公开课】
介绍 Lakehouse 搜索引擎的设计思想,探讨其如何使用缓存,辅助数据结构,存储格式,动态文件剪枝,以及 vectorized execution 达到优越的处理性能。
379 0
Delta Lake基础介绍(商业版)【Databricks 数据洞察公开课】
|
SQL 存储 人工智能
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
本文介绍了Databricks企业版Delta Lake的性能优势,借助这些特性能够大幅提升Spark SQL的查询性能,加快Delta表的查询速度。
356 0
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
|
存储 分布式计算 DataWorks
基于Delta lake、Hudi格式的湖仓一体方案
Delta Lake 和 Hudi 是流行的开放格式的存储层,为数据湖同时提供流式和批处理的操作,这允许我们在数据湖上直接运行 BI 等应用,让数据分析师可以即时查询新的实时数据,从而对您的业务产生即时的洞察。MaxCompute 在湖仓一体架构中,通过支持 Delta Lake 和 Hudi 在数据湖中提供数据仓库性能。
1455 2
基于Delta lake、Hudi格式的湖仓一体方案