不通过 Spark 获取 Delta Lake Snapshot

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。

​本文转载自公众号:偷闲小苑
原文链接

01 背景

Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。

Delta Lake 官方提供了 API 可以通过其 snapshot 获取相应表或分区对应的真实 parquet 路径,但其目前强依赖 Spark ,需要传入 SparkSession,例如

val snapshot = DeltaLog.forTable(spark, location).snapshot

如果仅仅只是想获取 snapshot,通过这种方式冷启动耗时会比较长。Delta Lake 的表事务目录 _delta_log 是以 parquet(checkpoint) + json 文件组成的,所以可以直接读取该 commit log ,实现一个简单的 Delta Lake 工具类用于获取相关信息,基本能在毫秒级获得结果。

02 DeltaHelper

以下为 Java 版本,同时也用 scala 实现了一版,感觉scala的代码可读性更高。



import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
​
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
​
/**
 * 用于读取 delta 数据
 */
public class DeltaHelper {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(DeltaHelper.class);
​
    public static List<FileStatus> loadFileStatus(String rawPath,
                                                  FileSystem fs) throws IOException {
        List<Path> pathList = load(rawPath, fs);
        List<FileStatus> input = new ArrayList<>();
        for (Path p : pathList) {
            input.add(fs.getFileStatus(p));
        }
        return input;
    }
​
    /**
     * 获取 delta 表真实 parquet 路径
     */
    public static List<Path> load(String rawPath,
                                  FileSystem fs) throws IOException {
        String tablePath = cutPartitionPath(rawPath);
        String partitionPath = tablePath.length() != rawPath.length() ? rawPath.substring(tablePath.length() + 1) : "";
        Path deltaLogPath = fs.makeQualified(new Path(tablePath, "_delta_log"));
        ArrayList<Path> result = new ArrayList<>();
        ArrayList<Path> parquetPathList = new ArrayList<>();
        LinkedList<String> checkPointPath = new LinkedList<>();
        LinkedList<String> afterCheckPointPath = new LinkedList<>();
        long lastCheckPointIndex = 0L;
​
        for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
            Path path = fileStatus.getPath();
            if (path.toString().contains("parquet")) {
                parquetPathList.add(path);
            }
        }
        if (parquetPathList.size() != 0) {
            String lastCheckPointPath = parquetPathList.get(parquetPathList.size() - 1).toString();
            lastCheckPointIndex = getDeltaLogIndex(lastCheckPointPath, "parquet");
            checkPointPath = getCheckPointPath(lastCheckPointPath, fs.getConf(), partitionPath);
        }
        for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
            Path path = fileStatus.getPath();
            if (path.toString().contains("json")) {
                // 不存在 checkpoint 的情况下读取全部 json,存在 checkpoint 的情况只读取 index 比 lastCheckPointIndex 的 json
                if (lastCheckPointIndex == 0 || getDeltaLogIndex(path.toString(), "json") > lastCheckPointIndex) {
                    BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
                    String line;
                    line = br.readLine();
                    while (line != null) {
                        JSONObject obj = JSON.parseObject(line);
                        JSONObject addObj = obj.getJSONObject("add");
                        JSONObject removeObj = obj.getJSONObject("remove");
                        if (addObj != null) {
                            String addPath = addObj.getString("path");
                            if (StringUtils.isNoneEmpty(addPath) && partitionCond(addPath, partitionPath)) {
                                afterCheckPointPath.add(addPath);
                            }
                        } else if (removeObj != null) {
                            String removePath = removeObj.getString("path");
                            if (StringUtils.isNoneEmpty(removePath) && partitionCond(removePath, partitionPath)) {
                                checkPointPath.remove(removePath);
                                afterCheckPointPath.remove(removePath);
                            }
                        }
                        line = br.readLine();
                    }
                }
            }
        }
        checkPointPath.addAll(afterCheckPointPath);
        for (String path : checkPointPath) {
            result.add(new Path(tablePath + "/" + path));
        }
        return result;
    }
​
    /**
     * 判断表目录下是否存在 _delta_log
     * /user/hive/warehouse/db_name/table_name/_delta_log
     */
    public static boolean isDeltaTable(String path,
                                       FileSystem fs) throws IOException {
        Path deltaLogPath = fs.makeQualified(new Path(cutPartitionPath(path), "_delta_log"));
        return fs.exists(deltaLogPath);
    }
​
    /**
     * /a/b/c=1/d=2 => /a/b
     */
    private static String cutPartitionPath(String path) {
        String lastStr = path.substring(path.lastIndexOf("/") + 1);
        if (lastStr.contains("=")) {
            return cutPartitionPath(path.substring(0, path.indexOf(lastStr) - 1));
        } else {
            return path;
        }
    }
​
    /**
     * 获取 deltaLog 的索引
     */
    private static Long getDeltaLogIndex(String path,
                                         String format) {
        String index;
        if (format.equals("parquet")) {
            index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".checkpoint.parquet"));
        } else {
            index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".json"));
        }
        return Long.parseLong(index);
    }
​
    /**
     * 分区路径判断
     */
    private static boolean partitionCond(String path,
                                         String partition) {
        return (path.contains(partition) && StringUtils.isNoneBlank(partition)) || StringUtils.isBlank(partition);
    }
​
    /**
     * 从 checkpoint(parquet) 中获取对应的路径
     */
    private static LinkedList<String> getCheckPointPath(String path,
                                                        Configuration conf,
                                                        String partitionPath) {
        LinkedList<String> parquetList = new LinkedList<>();
        if (StringUtils.isNoneEmpty(path)) {
            try {
                ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(path)).withConf(conf).build();
                Group recordData = reader.read();
                while (recordData != null) {
                    String addPath;
                    String removePath;
                    try {
                        addPath = recordData.getGroup("add", 0).getString("path", 0);
                        if (partitionCond(addPath, partitionPath)) {
                            parquetList.add(addPath);
                        }
                    } catch (RuntimeException ignored) {
                    }
                    try {
                        removePath = recordData.getGroup("remove", 0).getString("path", 0);
                        if (partitionCond(removePath, partitionPath)) {
                            parquetList.remove(removePath);
                        }
                    } catch (RuntimeException ignored) {
                    }
                    recordData = reader.read();
                }
            } catch (IOException e) {
                LOGGER.error("读取 delta parquet checkpoint 失败");
            }
        }
        return parquetList;
    }
}

03使用

获取某表所有存活的 parquet 文件路径

input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name")  
output:
/user/hive/warehouse/db_name/table_name/a1.parquet

获取某表某分区所有存活的 parquet 文件路径

input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name/ds=20200101")
output:
/user/hive/warehouse/db_name/table_name/ds=20200101/a1.parquet

04 后记

能够理解 Databricks 团队对 Delta Lake 的定位及在海量数据场景下通过 Spark 读取的优势,但是目前过于依赖 Spark 生态圈,使得有些场景使用起来会比较困难。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
SQL 存储 人工智能
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
本文介绍了Databricks企业版Delta Lake的性能优势,借助这些特性能够大幅提升Spark SQL的查询性能,加快Delta表的查询速度。
366 0
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
|
分布式计算 Spark 数据格式
【spark系列8】spark delta读数据实现分析
【spark系列8】spark delta读数据实现分析
360 0
|
存储 SQL JSON
【spark系列7】spark delta写操作ACID事务实现分析
【spark系列7】spark delta写操作ACID事务实现分析
245 0
|
存储 分布式计算 vr&ar
【spark系列6】spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析
【spark系列6】spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析
214 0
|
SQL 存储 分布式计算
【spark系列5】spark 3.0.1集成delta 0.7.0原理解析--delta如何进行DDL DML操作以及Catalog plugin API
【spark系列5】spark 3.0.1集成delta 0.7.0原理解析--delta如何进行DDL DML操作以及Catalog plugin API
433 0
|
SQL 存储 分布式计算
【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql
【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql
304 0
|
SQL 存储 分布式计算
实战 | 利用Delta Lake使Spark SQL支持跨表CRUD操作
本文介绍eBay Carmel团队利用Delta Lake,使Spark SQL支持Teradata的Update/Delete语法。主要从源码角度介绍了CRUD操作的具体实现和优化,以及delta表的管理工作。希望对同业人员有所启发和帮助。
实战 | 利用Delta Lake使Spark SQL支持跨表CRUD操作
|
数据采集 消息中间件 存储
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
本文主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,本文探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
|
分布式计算 Spark Python
开源生态的新发展:Apache Spark 3.0、Koala和Delta Lake
Hadoop开源生态Spark已经发展三年有余,今年迎来了Spark 3.0。在2019杭州云栖大会大数据&AI峰会上,Databricks研发总监李潇为大家分享了Spark 3.0版本的新特性,以及其在数据工程以及数据科学方面带来的新技术。
开源生态的新发展:Apache Spark 3.0、Koala和Delta Lake
|
分布式计算 Spark Apache
Apache Spark Delta Lake 删除使用及实现原理代码解析
Apache Spark Delta Lake 删除使用及实现原理代码解析 Delta Lake 的 Delete 功能是由 0.3.0 版本引入的。在介绍 Apache Spark Delta Lake 实现逻辑之前,我们先来看看如何使用 delete 这个功能。
2035 0