不通过 Spark 获取 Delta Lake Snapshot-阿里云开发者社区

开发者社区> Apache Spark中国社区> 正文

不通过 Spark 获取 Delta Lake Snapshot

简介: Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。

​本文转载自公众号:偷闲小苑
原文链接

01 背景

Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。

Delta Lake 官方提供了 API 可以通过其 snapshot 获取相应表或分区对应的真实 parquet 路径,但其目前强依赖 Spark ,需要传入 SparkSession,例如

val snapshot = DeltaLog.forTable(spark, location).snapshot

如果仅仅只是想获取 snapshot,通过这种方式冷启动耗时会比较长。Delta Lake 的表事务目录 _delta_log 是以 parquet(checkpoint) + json 文件组成的,所以可以直接读取该 commit log ,实现一个简单的 Delta Lake 工具类用于获取相关信息,基本能在毫秒级获得结果。

02 DeltaHelper

以下为 Java 版本,同时也用 scala 实现了一版,感觉scala的代码可读性更高。



import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
​
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
​
/**
 * 用于读取 delta 数据
 */
public class DeltaHelper {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(DeltaHelper.class);
​
    public static List<FileStatus> loadFileStatus(String rawPath,
                                                  FileSystem fs) throws IOException {
        List<Path> pathList = load(rawPath, fs);
        List<FileStatus> input = new ArrayList<>();
        for (Path p : pathList) {
            input.add(fs.getFileStatus(p));
        }
        return input;
    }
​
    /**
     * 获取 delta 表真实 parquet 路径
     */
    public static List<Path> load(String rawPath,
                                  FileSystem fs) throws IOException {
        String tablePath = cutPartitionPath(rawPath);
        String partitionPath = tablePath.length() != rawPath.length() ? rawPath.substring(tablePath.length() + 1) : "";
        Path deltaLogPath = fs.makeQualified(new Path(tablePath, "_delta_log"));
        ArrayList<Path> result = new ArrayList<>();
        ArrayList<Path> parquetPathList = new ArrayList<>();
        LinkedList<String> checkPointPath = new LinkedList<>();
        LinkedList<String> afterCheckPointPath = new LinkedList<>();
        long lastCheckPointIndex = 0L;
​
        for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
            Path path = fileStatus.getPath();
            if (path.toString().contains("parquet")) {
                parquetPathList.add(path);
            }
        }
        if (parquetPathList.size() != 0) {
            String lastCheckPointPath = parquetPathList.get(parquetPathList.size() - 1).toString();
            lastCheckPointIndex = getDeltaLogIndex(lastCheckPointPath, "parquet");
            checkPointPath = getCheckPointPath(lastCheckPointPath, fs.getConf(), partitionPath);
        }
        for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
            Path path = fileStatus.getPath();
            if (path.toString().contains("json")) {
                // 不存在 checkpoint 的情况下读取全部 json,存在 checkpoint 的情况只读取 index 比 lastCheckPointIndex 的 json
                if (lastCheckPointIndex == 0 || getDeltaLogIndex(path.toString(), "json") > lastCheckPointIndex) {
                    BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
                    String line;
                    line = br.readLine();
                    while (line != null) {
                        JSONObject obj = JSON.parseObject(line);
                        JSONObject addObj = obj.getJSONObject("add");
                        JSONObject removeObj = obj.getJSONObject("remove");
                        if (addObj != null) {
                            String addPath = addObj.getString("path");
                            if (StringUtils.isNoneEmpty(addPath) && partitionCond(addPath, partitionPath)) {
                                afterCheckPointPath.add(addPath);
                            }
                        } else if (removeObj != null) {
                            String removePath = removeObj.getString("path");
                            if (StringUtils.isNoneEmpty(removePath) && partitionCond(removePath, partitionPath)) {
                                checkPointPath.remove(removePath);
                                afterCheckPointPath.remove(removePath);
                            }
                        }
                        line = br.readLine();
                    }
                }
            }
        }
        checkPointPath.addAll(afterCheckPointPath);
        for (String path : checkPointPath) {
            result.add(new Path(tablePath + "/" + path));
        }
        return result;
    }
​
    /**
     * 判断表目录下是否存在 _delta_log
     * /user/hive/warehouse/db_name/table_name/_delta_log
     */
    public static boolean isDeltaTable(String path,
                                       FileSystem fs) throws IOException {
        Path deltaLogPath = fs.makeQualified(new Path(cutPartitionPath(path), "_delta_log"));
        return fs.exists(deltaLogPath);
    }
​
    /**
     * /a/b/c=1/d=2 => /a/b
     */
    private static String cutPartitionPath(String path) {
        String lastStr = path.substring(path.lastIndexOf("/") + 1);
        if (lastStr.contains("=")) {
            return cutPartitionPath(path.substring(0, path.indexOf(lastStr) - 1));
        } else {
            return path;
        }
    }
​
    /**
     * 获取 deltaLog 的索引
     */
    private static Long getDeltaLogIndex(String path,
                                         String format) {
        String index;
        if (format.equals("parquet")) {
            index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".checkpoint.parquet"));
        } else {
            index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".json"));
        }
        return Long.parseLong(index);
    }
​
    /**
     * 分区路径判断
     */
    private static boolean partitionCond(String path,
                                         String partition) {
        return (path.contains(partition) && StringUtils.isNoneBlank(partition)) || StringUtils.isBlank(partition);
    }
​
    /**
     * 从 checkpoint(parquet) 中获取对应的路径
     */
    private static LinkedList<String> getCheckPointPath(String path,
                                                        Configuration conf,
                                                        String partitionPath) {
        LinkedList<String> parquetList = new LinkedList<>();
        if (StringUtils.isNoneEmpty(path)) {
            try {
                ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(path)).withConf(conf).build();
                Group recordData = reader.read();
                while (recordData != null) {
                    String addPath;
                    String removePath;
                    try {
                        addPath = recordData.getGroup("add", 0).getString("path", 0);
                        if (partitionCond(addPath, partitionPath)) {
                            parquetList.add(addPath);
                        }
                    } catch (RuntimeException ignored) {
                    }
                    try {
                        removePath = recordData.getGroup("remove", 0).getString("path", 0);
                        if (partitionCond(removePath, partitionPath)) {
                            parquetList.remove(removePath);
                        }
                    } catch (RuntimeException ignored) {
                    }
                    recordData = reader.read();
                }
            } catch (IOException e) {
                LOGGER.error("读取 delta parquet checkpoint 失败");
            }
        }
        return parquetList;
    }
}

03使用

获取某表所有存活的 parquet 文件路径

input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name")  
output:
/user/hive/warehouse/db_name/table_name/a1.parquet

获取某表某分区所有存活的 parquet 文件路径

input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name/ds=20200101")
output:
/user/hive/warehouse/db_name/table_name/ds=20200101/a1.parquet

04 后记

能够理解 Databricks 团队对 Delta Lake 的定位及在海量数据场景下通过 Spark 读取的优势,但是目前过于依赖 Spark 生态圈,使得有些场景使用起来会比较困难。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎加入!邀请你加入钉钉群聊Apache Spark中国技术交流社区,点击进入查看详情 https://qr.dingtalk.com/action/joingroup?code=v1,k1,X7S/0/QcrLMkK7QZ5sw2oTvoYW49u0g5dvGu7PW+sm4=&_dt_no_comment=1&origin=11

官方博客
官网链接