Delta Lake Presto Integration & Manifests 机制

简介: Delta 0.5 已于上周发布,增加了不少新特性,这篇文章主要讲解其 Presto Integration 和 Manifests 机制。

原文链接

该功能与我们之前平台化 Delta Lake 平台化实践(离线篇) 的很多工作都较为相似,比如与 metastore 的集成,直接通过 manifest 读取 delta 存活文件等。
Delta Lake 在 0.5 之前只支持通过 Spark 读取数据,在新版本中增加了其他处理引擎通过 manifest 文件访问 Delta Lake 的能力。下文以Presto 为例说明如何通过 manifest 文件访问数据,manifest 文件的生成及其一些限制。

01 使用

Presto 使用 manifest 文件从 hive 外部表中读取数据,manifest文件是一个文本文件,包含该表/分区所有存活数据的路径列表。

当使用 manifest 文件在 Hive metastore 中定义外部表时,Presto 将会先读取 manifest 中的文件路径列表再去访问想要的文件,而不是直接通过目录列表来查找文件。

1.1 通过 Spark 生成 manifest 文件

支持 sql / scala / java / python 四种 api,以 sql 和 scala 为例。

SQL

GENERATE symlink_format_manifest FOR TABLE delta.`pathToDeltaTable`

Scala

val deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")

使用 GENERATE 命令会在/path/to/deltaTable/_symlink_format_manifest/ 目录下生成一个 manifest 文件,其中包含了所有存活的文件路径。

查看清单文件

cat /path/to/deltaTable/_symlink_format_manifest/manifest
​
hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-0a69ce8d-0d9e-47e2-95b2-001bd196441d-c000.snappy.parquet
hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-ba1767cb-ff0e-4e65-8e83-7a0cdce6a2f4-c000.snappy.parquet

如果是分区表,例如以 ds 作为分区字段,生成的结构如果下,每个分区下都有一个 manifest 文件包含了该分区的存活文件路径。

/path/to/table/_delta_log
/path/to/table/ds=20190101
/path/to/table/ds=20190102
/path/to/table/_symlink_format_manifest
---- /path/to/table/_symlink_format_manifest/ds=20190101/manifest
---- /path/to/table/_symlink_format_manifest/ds=20190102/manifest

存活文件定义:add file - remove file

1.2 定义 Hive Metastore 外部表读取相应文件

CREATE EXTERNAL TABLE mytable ( ... )   -- 与 Delta table 一致的 schema 信息
PARTITIONED BY ( ... )  -- 分区参数可选,需要与 Delta table 一致
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '<pathToDeltaTable>/_symlink_format_manifest/'  -- 指定 manifest 地址

通过 SymlinkTextInputFormat ,Presto 可以直接从 manifest 中读取需要的文件而不需要直接定位到数据目录。

如果是分区表的话,在运行 generate 后,需要运行 MSCK REPAIR TABLE 使 Hive Metastore 能发现最新的分区。使用 repair 有两种场景:

  • 每次清单文件生成后调用:每次 generate 都调用 repair,这种方式在分区多的情况下性能表现会非常糟糕,我们的做法是在数据写入时从 spark 获取到相应的变更分区然后依次执行 ADD PARTITION操作。
  • 在需要新分区的时候调用:如果是按天粒度的分区表,可以选择在午夜12点创建新分区同时执行 generate 后运行一次 repair 。

important: 如果使用了 kerberos 认证,必须要在 presto 目录的etc/catalog/hive.properties 中配置 yarn-site.xml,否则在查询数据时会提示错误

com.facebook.presto.spi.PrestoException: Can't get Master Kerberos principal for use as renewer
  at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:191)
  at com.facebook.presto.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
  at com.facebook.presto.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
  at com.facebook.presto.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
  at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Can't get Master Kerberos principal for use as renewer
  at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
  at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
  at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
  at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:304)
  at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
  at com.facebook.presto.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
  at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
  ... 7 more

02 Generate 过程

Generate 命令生成 manifest 的逻辑并不复杂,有兴趣的同学可以看下,方法入口:

DeltaGenerateCommand ->

GenerateSymlinkManifest.generateFullManifest(spark: SparkSession,deltaLog: DeltaLog)

  1. 在分区表每个分区或者非分区表中原子性的更新 manifest 文件
def writeSingleManifestFile(
manifestDirAbsPath: String,
dataFileRelativePaths: Iterator[String]): Unit = {
​val manifestFilePath = new Path(manifestDirAbsPath, "manifest")
      val fs = manifestFilePath.getFileSystem(hadoopConf.value)
      fs.mkdirs(manifestFilePath.getParent())
​
      val manifestContent = dataFileRelativePaths.map { relativePath =>
        DeltaFileOperations.absolutePath(tableAbsPathForManifest, relativePath).toString
      }
      val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value)
      logStore.write(manifestFilePath, manifestContent, overwrite = true)
    }
​
    // 我修复了 Delta 0.5 删除非分区表失效的 BUG,已将 PR 提交社区
    val newManifestPartitionRelativePaths =
      if (fileNamesForManifest.isEmpty && partitionCols.isEmpty) {
        writeSingleManifestFile(manifestRootDirPath, Iterator())
        Set.empty[String]
      } else {
        withRelativePartitionDir(spark, partitionCols, fileNamesForManifest)
          .select("relativePartitionDir", "path").as[(String, String)]
          .groupByKey(_._1).mapGroups {
          (relativePartitionDir: String, relativeDataFilePath: Iterator[(String, String)]) =>
            val manifestPartitionDirAbsPath = {
              if (relativePartitionDir == null || relativePartitionDir.isEmpty) manifestRootDirPath
              else new Path(manifestRootDirPath, relativePartitionDir).toString
            }
            writeSingleManifestFile(manifestPartitionDirAbsPath, relativeDataFilePath.map(_._2))
            relativePartitionDir
        }.collect().toSet
      }
  1. 删除分区表中失效分区的 manifest 文件
  val existingManifestPartitionRelativePaths = {
      val manifestRootDirAbsPath = fs.makeQualified(new Path(manifestRootDirPath))
      if (fs.exists(manifestRootDirAbsPath)) {
        val index = new InMemoryFileIndex(spark, Seq(manifestRootDirAbsPath), Map.empty, None)
        val prefixToStrip = manifestRootDirAbsPath.toUri.getPath
        index.inputFiles.map { p =>
          val relativeManifestFilePath =
            new Path(p).toUri.getPath.stripPrefix(prefixToStrip).stripPrefix(Path.SEPARATOR)
          new Path(relativeManifestFilePath).getParent.toString 
        }.filterNot(_.trim.isEmpty).toSet
      } else Set.empty[String]
    }
​
    val manifestFilePartitionsToDelete =
      existingManifestPartitionRelativePaths.diff(newManifestPartitionRelativePaths)
    deleteManifestFiles(manifestRootDirPath, manifestFilePartitionsToDelete, hadoopConf)

03 一些限制

3.1 数据一致性

在 Delta Lake 更新 manifest 时,它会原子的自动覆盖现有的 manifest 文件。因此,Presto 将始终看到一致的数据文件视图,然而,保证一致性的粒度取决于表是否分区。

非分区表

所有的文件路径都写在一个会原子更新的 manifest 文件中(参考上文结构),这种情况下 Presto 能看到一致性快照。

分区表

manifest 文件将以 hive 分区的目录结构 (参考上文结构),这意味着每个分区都是原子更新,所以 Presto 能看到一个分区内的一致性视图而不是跨分区的一致性视图。此外,由于所有的分区并不是同时更新,所以读取时可能会在不同分区中读到不同 manifest 版本。

简单的说,当在更新清单文件时,Presto 发起读请求,由于 manifest 所有分区并不是一次原子更新操作,所以有可能得到的结果并不是最新的数据。

3.2 性能

大量的文件数量会造成 Presto 性能下降,官方的建议是在执行 generate 生成 manifest 前先对文件进行 compact 操作。分区表的单个分区或是非分区表的文件数量不超过1000。

3.3 Schema 推断

原生的 Delta Lake 支持 schema evolution,意味着无论 hive metastore 定义的 schema 如何,都会基于文件使用最新的 Schema。由于 Presto 直接使用了定义在 hive metastore 中的 schema ,所以如果要修改 schema 信息,必须要对表进行相应更新 。

04 后记

一些BUG

测试过程中还发现了一个 BUG,如果将非分区表的数据全部删除,则 generate 后 manifest 不会更新。
已将 PR 提交社区 https://github.com/delta-io/delta/issues/275

实践经验

首先,由于需要额外的调用 generate 命令生成/更新 manifest 文件,使用体验肯定不如直接通过 Spark 读取数据。
其次,在 generate 过程中进行数据读取有可能会遇到跨分区查询版本不一致的情况,但是瑕不掩瑜,通过 manifest,与大数据生态其他处理引擎的道路被打开了。
就像在 Delta Lake 平台化实践(离线篇) 这篇文章中提到的,我们的大数据平台有一个功能是表数据/分区数据预览,通过 spark 去查用户体验会相当差(耗时长),我们之前的做法是自定义了一个工具类在查询时从 _delta_log中生成 manifest,再通过 manifest 获取到的文件路径直接从文件系统中读取 Parquet 实现,有了 generate 功能,就可以直接读取 manifest 文件,外部系统扩展工作量极大的简化。
在我们的生产环境中,presto 和 spark 使用的同一套 hive metastore ,但是 spark 直接读取上述创建的外部表会报错(就算能读也会有一致性风险),解决办法是在平台拦截了 sql 方法,通过判断 table properties 识别 delta 表,然后将其直接转化为 delta api 对数据进行操作,Presto 则是直接访问外表,解决了冲突的问题。

----

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关文章
|
Java
groovy 规则初次尝试
根据不同的业务场景判断不同的数据,做成通用化
773 0
|
消息中间件 Java Kafka
Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka
  一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
3640 0
|
7月前
|
数据可视化 物联网 开发者
深度解析四大LLM微调工具:从单卡到千亿级训练的四大解决方案
本文详解大语言模型微调四大工具——Unsloth、Axolotl、LlamaFactory、DeepSpeed,覆盖从单卡实验到万亿参数分布式训练场景,助你掌握主流框架选型策略,提升微调效率。建议点赞收藏。
2417 1
|
存储 运维 监控
API明细日志及运维统计日志全面提升API可运维性
在数字化转型的大潮中,数据已成为企业最宝贵的资产之一。而数据服务API可快速为数据应用提供数据接口。面对越来越多的API以及越来越多的应用调用,如何快速查看API的服务情况、异常情况及影响范围,以及查看API的调用详情,进行API的性能优化、错误排查变得越来越重要,本文将介绍如何配置和开通API运维统计及明细日志,以及如何查看日志进行介绍。
726 0
|
11月前
|
存储 人工智能 弹性计算
阿里云服务器五代至八代实例对比:性能对比与精准选型指南参考
目前,阿里云服务器最新的实例规格已经升级到第九代,不过主售的云服务器实例规格还是以七代和八代云服务器为主。对于初次接触阿里云服务器实例规格的用户来说,可能并不清楚阿里云服务器五代、六代、七代、八代实例有哪些,以及它们之间有何区别。本文将详细介绍阿里云五代、六代、七代、八代云服务器实例规格,并对比它们在性能方面的提升,以供参考和选择。
|
XML Java API
List与String相互转化方法汇总
本文汇总了List与String相互转化的多种方法,包括使用`String.join()`、`StringBuilder`、Java 8的Stream API、Apache Commons Lang3的`StringUtils.join()`以及Guava的`Joiner.on()`方法实现List转String;同时介绍了使用`split()`方法、正则表达式、Apache Commons Lang3的`StringUtils.split()`及Guava的`Splitter.on()`方法实现String转List。
2548 1
List与String相互转化方法汇总
|
算法 安全 前端开发
基于postMessage和BroadcastChannel实现浏览器跨Tab窗口通信的方法介绍
基于postMessage和BroadcastChannel实现浏览器跨Tab窗口通信的方法介绍
632 1
|
数据可视化 定位技术 数据格式
看完这篇文章,我才知道 Python 制作动态图表的正确方式
关于动态图表,相信大家都或多或少的接触过一些,如果是代码水平比较不错的,可以选择 Matplotlib,当然也可以使用 pyecharts 的相关功能,不过这些工具都专注于图表的制作,也就是对于图表的数据,你是需要自行转换的。而今天介绍的这个可视化图库,完美的结合了 Pandas 数据格式,又辅以 Matplotlib 的强大功能,使得我们制作动图变得容易的多了。
看完这篇文章,我才知道 Python 制作动态图表的正确方式
|
存储 NoSQL 关系型数据库
一文探索应运而生的数据库们
本文系统性回顾了数据库技术的发展历程与现状,从层次数据库 IMS 到新兴的向量数据库 Milvus,每一类数据库的诞生都映射了特定时代的技术挑战与应用需求。
|
存储 安全 Linux
s3fs挂载S3对象桶
s3fs(Simple Storage Service File System)是一个基于FUSE(Filesystem in Userspace)的文件系统,它允许将S3(Simple Storage Service)或其他兼容S3 API的对象存储服务挂载到本地文件系统中,从而能够像访问本地磁盘一样访问远程对象存储。以下是通过s3fs挂载OBS(Object Storage Service,对象存储服务,这里以华为云OBS为例)对象桶的基本步骤: ### 一、环境准备 1. **安装s3fs**: - 对于CentOS系统,可以使用yum安装s3fs-fuse: ```
2840 7