本文转载自公众号:偷闲小苑
原文链接
01 背景
Delta Lake 进行数据删除或更新操作时实际上只是对被删除数据文件做了一个 remove 标记,在进行 vacuum 前并不会进行物理删除,因此一些例如在 web 上获取元数据或进行部分数据展示的操作如果直接从表路径下获取 parquet 文件信息,读到的可能是历史已经被标记删除的数据。
Delta Lake 官方提供了 API 可以通过其 snapshot 获取相应表或分区对应的真实 parquet 路径,但其目前强依赖 Spark ,需要传入 SparkSession,例如
val snapshot = DeltaLog.forTable(spark, location).snapshot
如果仅仅只是想获取 snapshot,通过这种方式冷启动耗时会比较长。Delta Lake 的表事务目录 _delta_log 是以 parquet(checkpoint) + json 文件组成的,所以可以直接读取该 commit log ,实现一个简单的 Delta Lake 工具类用于获取相关信息,基本能在毫秒级获得结果。
02 DeltaHelper
以下为 Java 版本,同时也用 scala 实现了一版,感觉scala的代码可读性更高。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* 用于读取 delta 数据
*/
public class DeltaHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaHelper.class);
public static List<FileStatus> loadFileStatus(String rawPath,
FileSystem fs) throws IOException {
List<Path> pathList = load(rawPath, fs);
List<FileStatus> input = new ArrayList<>();
for (Path p : pathList) {
input.add(fs.getFileStatus(p));
}
return input;
}
/**
* 获取 delta 表真实 parquet 路径
*/
public static List<Path> load(String rawPath,
FileSystem fs) throws IOException {
String tablePath = cutPartitionPath(rawPath);
String partitionPath = tablePath.length() != rawPath.length() ? rawPath.substring(tablePath.length() + 1) : "";
Path deltaLogPath = fs.makeQualified(new Path(tablePath, "_delta_log"));
ArrayList<Path> result = new ArrayList<>();
ArrayList<Path> parquetPathList = new ArrayList<>();
LinkedList<String> checkPointPath = new LinkedList<>();
LinkedList<String> afterCheckPointPath = new LinkedList<>();
long lastCheckPointIndex = 0L;
for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
Path path = fileStatus.getPath();
if (path.toString().contains("parquet")) {
parquetPathList.add(path);
}
}
if (parquetPathList.size() != 0) {
String lastCheckPointPath = parquetPathList.get(parquetPathList.size() - 1).toString();
lastCheckPointIndex = getDeltaLogIndex(lastCheckPointPath, "parquet");
checkPointPath = getCheckPointPath(lastCheckPointPath, fs.getConf(), partitionPath);
}
for (FileStatus fileStatus : fs.listStatus(deltaLogPath)) {
Path path = fileStatus.getPath();
if (path.toString().contains("json")) {
// 不存在 checkpoint 的情况下读取全部 json,存在 checkpoint 的情况只读取 index 比 lastCheckPointIndex 的 json
if (lastCheckPointIndex == 0 || getDeltaLogIndex(path.toString(), "json") > lastCheckPointIndex) {
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
String line;
line = br.readLine();
while (line != null) {
JSONObject obj = JSON.parseObject(line);
JSONObject addObj = obj.getJSONObject("add");
JSONObject removeObj = obj.getJSONObject("remove");
if (addObj != null) {
String addPath = addObj.getString("path");
if (StringUtils.isNoneEmpty(addPath) && partitionCond(addPath, partitionPath)) {
afterCheckPointPath.add(addPath);
}
} else if (removeObj != null) {
String removePath = removeObj.getString("path");
if (StringUtils.isNoneEmpty(removePath) && partitionCond(removePath, partitionPath)) {
checkPointPath.remove(removePath);
afterCheckPointPath.remove(removePath);
}
}
line = br.readLine();
}
}
}
}
checkPointPath.addAll(afterCheckPointPath);
for (String path : checkPointPath) {
result.add(new Path(tablePath + "/" + path));
}
return result;
}
/**
* 判断表目录下是否存在 _delta_log
* /user/hive/warehouse/db_name/table_name/_delta_log
*/
public static boolean isDeltaTable(String path,
FileSystem fs) throws IOException {
Path deltaLogPath = fs.makeQualified(new Path(cutPartitionPath(path), "_delta_log"));
return fs.exists(deltaLogPath);
}
/**
* /a/b/c=1/d=2 => /a/b
*/
private static String cutPartitionPath(String path) {
String lastStr = path.substring(path.lastIndexOf("/") + 1);
if (lastStr.contains("=")) {
return cutPartitionPath(path.substring(0, path.indexOf(lastStr) - 1));
} else {
return path;
}
}
/**
* 获取 deltaLog 的索引
*/
private static Long getDeltaLogIndex(String path,
String format) {
String index;
if (format.equals("parquet")) {
index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".checkpoint.parquet"));
} else {
index = path.substring(path.indexOf("_delta_log/") + 11, path.indexOf(".json"));
}
return Long.parseLong(index);
}
/**
* 分区路径判断
*/
private static boolean partitionCond(String path,
String partition) {
return (path.contains(partition) && StringUtils.isNoneBlank(partition)) || StringUtils.isBlank(partition);
}
/**
* 从 checkpoint(parquet) 中获取对应的路径
*/
private static LinkedList<String> getCheckPointPath(String path,
Configuration conf,
String partitionPath) {
LinkedList<String> parquetList = new LinkedList<>();
if (StringUtils.isNoneEmpty(path)) {
try {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(path)).withConf(conf).build();
Group recordData = reader.read();
while (recordData != null) {
String addPath;
String removePath;
try {
addPath = recordData.getGroup("add", 0).getString("path", 0);
if (partitionCond(addPath, partitionPath)) {
parquetList.add(addPath);
}
} catch (RuntimeException ignored) {
}
try {
removePath = recordData.getGroup("remove", 0).getString("path", 0);
if (partitionCond(removePath, partitionPath)) {
parquetList.remove(removePath);
}
} catch (RuntimeException ignored) {
}
recordData = reader.read();
}
} catch (IOException e) {
LOGGER.error("读取 delta parquet checkpoint 失败");
}
}
return parquetList;
}
}
03使用
获取某表所有存活的 parquet 文件路径
input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name")
output:
/user/hive/warehouse/db_name/table_name/a1.parquet
获取某表某分区所有存活的 parquet 文件路径
input:
DeltaHelper.load("/user/hive/warehouse/db_name/table_name/ds=20200101")
output:
/user/hive/warehouse/db_name/table_name/ds=20200101/a1.parquet
04 后记
能够理解 Databricks 团队对 Delta Lake 的定位及在海量数据场景下通过 Spark 读取的优势,但是目前过于依赖 Spark 生态圈,使得有些场景使用起来会比较困难。
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
Apache Spark技术交流社区公众号,微信扫一扫关注