iceberg 分区是如何读写和维护

简介: 了解iceberg 分区的信息和数据维护,以及分区变更之后对数据读取到影响

一:背景

iceberg 支持对表结构的变更和分区的变更,比较好奇的是如果调整了分区之后,是否会改变数据的存储形式,以及对查询是否有影响;

我们先建一个表:

CREATE TABLE sample7 (

`id` BIGINT,

`data` STRING,

`name` STRING,

`category` STRING)

USING iceberg

PARTITIONED BY (category, name)

TBLPROPERTIES( "format-version"="2")

插入3条数据(2个分区[xc1,pt2] [xc3,pt3])

spark-sql> select * from sample7;

22/08/09 20:17:18 WARN conf.HiveConf: HiveConf of name hive.merge.mapfile does not exist

22/08/09 20:17:18 WARN conf.HiveConf: HiveConf hive.mapjoin.smalltable.filesize expects LONG type value

1 1 xc1 pt2

2 2 xc1 pt2

2 2 xc3 pt3


二:分区信息在元数据信息里面如何维护;

1):分区信息在iceberg snapshot.metadata.json 里如何维护

 { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } } 

2):manifest-list 如何维护分区信息

manifest-list.avro 中维护了多个manifest 信息,并列存储 ,每个对象为GenericManifestFile

 { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } } 

3):manifest_path 如何维护分区信息

 { "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt2" }, "name": { "string": "xc1" } }, "record_count": 2, "file_size_in_bytes": 1254, "column_sizes": { "array": [ { "key": 1, "value": 55 }, { "key": 2, "value": 55 }, { "key": 3, "value": 97 }, { "key": 4, "value": 97 } ] }, "value_counts": { "array": [ { "key": 1, "value": 2 }, { "key": 2, "value": 2 }, { "key": 3, "value": 2 }, { "key": 4, "value": 2 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "1" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } } { "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt3/name=xc3/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00002.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt3" }, "name": { "string": "xc3" } }, "record_count": 1, "file_size_in_bytes": 1144, "column_sizes": { "array": [ { "key": 1, "value": 51 }, { "key": 2, "value": 51 }, { "key": 3, "value": 54 }, { "key": 4, "value": 54 } ] }, "value_counts": { "array": [ { "key": 1, "value": 1 }, { "key": 2, "value": 1 }, { "key": 3, "value": 1 }, { "key": 4, "value": 1 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } } 

4):数据文件parquet 如何维护分区信息

 creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) extra: iceberg.schema = {"type":"struct","schema-id":0, "fields": [{"id":1,"name":"id","required":false,"type":"long"}, {"id":2,"name":"data","required":false,"type":"string"}, {"id":3,"name":"name","required":false,"type":"string"}, {"id":4,"name":"category","required":false,"type":"string"}]} file schema: table -------------------------------------------------------------------------------- id: OPTIONAL INT64 R:0 D:1 data: OPTIONAL BINARY L:STRING R:0 D:1 name: OPTIONAL BINARY L:STRING R:0 D:1 category: OPTIONAL BINARY L:STRING R:0 D:1 row group 1: RC:2 TS:198 OFFSET:4 -------------------------------------------------------------------------------- id: INT64 GZIP DO:0 FPO:4 SZ:55/45/0.82 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] data: BINARY GZIP DO:0 FPO:59 SZ:55/39/0.71 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] name: BINARY GZIP DO:114 FPO:160 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: xc1, max: xc1, num_nulls: 0] category: BINARY GZIP DO:211 FPO:257 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: pt2, max: pt2, num_nulls: 0] 

三: flink 如何读取带分区的数据,如何过滤掉不在分区内的数据(高效读取)

* 构建IcebergTableSource (public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown

看tablesource 的实现接口包括 投影下推,条件下推,limit 下推 目的是在构建读取的时候尽量减少对文件的scan,获取需要的数据的最小集

* 分片逻辑里面如何处理分区相关字段

image.png

对分区逻辑进行过滤的核心逻辑在

org.apache.iceberg.ManifestGroup#entries(java.util.function.BiFunction>,org.apache.iceberg.io.CloseableIterable>) 中;

image.png

特别关注其中的evalCache 的构造:

image.png

通过manifest 的分区信息(spec) + 过滤条件(dataFilter)构建出ManifestEvaluator;

比如分区name, 查询条件 id= 1 and name = 'xc';

会将条件name 作为分区的过滤判断条件;

* ManifestEvaluator

当前例子中的判断条件是eq

image.png

获取到对应的manifest统计信息根据条件表达式的逻辑判断是否符合目标,不符合就过滤掉即不需要再构建ManifestReader 去读取manifest 文件;

如果满足条件则根据当前manifest 信息读取manifest_path 构建ManifestReader;

image.png

根据条件信息(where id= 1 and name = 'xc')+ 分区列信息(spec)构建出分区过滤信息:lazyEvaluator

privateEvaluatorevaluator() {
if (lazyEvaluator==null) {
Expressionprojected=Projections.inclusive(spec, caseSensitive).project(rowFilter);
ExpressionfinalPartFilter=Expressions.and(projected, partFilter);
if (finalPartFilter!=null) {
this.lazyEvaluator=newEvaluator(spec.partitionType(), finalPartFilter, caseSensitive);
    } else {
this.lazyEvaluator=newEvaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
    }
  }
returnlazyEvaluator;
}

构建完成Evaluator 之后在迭代中调用具体的manifest 信息进行判断过滤


image.png

上面代码信息中

evaluator.eval(entry.file().partition()) 是用来进行分区过滤的;

metricsEvaluator.eval(entry.file())) 是用来根据条件对字段进行判断是否在对应的范围内;

其中分区的对比过滤逻辑是构建条件表达式,传入当前的manifest 中对应的data  的partition 值;

再根据 条件表达式中的操作符,比如例子中的op="EQ" 进行判断;将不符合的parquet 文件过滤掉;


回过头来看下mainfest 文件里面的一个parquet文件对应的信息

image.png

可以看到data_file 属性对应一个parquet文件,并且partition 信息也是具体的一个分区写入一个文件;

比如上面的category=pt2/name=xc1 分区对应一个数据文件:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet



四: 分区变更(比如alter table sample8 add partition field name;)之后维护的分区信息如何变更;

CREATE TABLE sample8 (

`id` BIGINT,

`data` STRING,

`name` STRING,

`category` STRING)

USING iceberg

PARTITIONED BY (category)

TBLPROPERTIES( "format-version"="2")

初始分区为category;

后面调整:alter table sample8 add partition field name;

* 添加完分区之后metadata.json 里的partition 信息变更,但是不生成新的snapshot ==> 说明数据结构文件和manifest 文件都没有调整

* 分区信息读取的时候还是基于原来的avro 描述的分区进行判断(分区为category)

从数据结构上看,基于相同分区的数据会写入相同的目录和文件下(大数据量未测试过,但是猜测也是一个相对聚集的数据)所以在构建分区过滤上只是对更高效的条件进行过滤而已;

image.png

如上图manifest-list 信息所示:

包含三个部分manifest 文件内容:

1:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/fdce5159-40c5-4cc2-ae60-0317165a5576-m0.avro 两个分区

2:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m0.avro 两个分区

3:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m1.avro 一个分区

相关文章
|
SQL 消息中间件 关系型数据库
iceberg实践
iceberg实践
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
1098 3
|
存储 SQL 分布式计算
Apache Iceberg数据湖基础
Apache Iceberg 是新一代数据湖表格式,旨在解决传统数据湖(如 Hive)在事务性、并发控制和元数据管理上的不足。它支持 Spark、Flink、Trino 等多种计算引擎,提供 ACID 事务、模式演化、分区演化等核心特性,具备良好的云存储兼容性和高性能查询能力,适用于大规模结构化数据分析场景。
1849 0
|
6月前
|
存储 人工智能 分布式计算
阿里云DLF 3.0:面向AI时代的智能全模态湖仓管理平台
在2025年云栖大会,阿里云发布DLF 3.0,升级为面向AI时代的智能全模态湖仓管理平台。支持结构化与非结构化数据统一管理,实现秒级实时处理、智能存储优化与细粒度安全控制,助力企业高效构建Data+AI基础设施。
2077 3
|
存储 人工智能 关系型数据库
4年10亿美金,Neon用Serverless PG证明:AI需要的不是“大”,而是“隐形”
AnalyticDB PostgreSQL 版基于Neon架构隆重推出满足 AI 时代应用开发需求的Serverless版本,并且在这之上搭载了结构化分析、向量检索、BM25全文检索和图检索,通过一套引擎满足 AI 应用丰富的数据诉求,支持MCP和OpenAI协议,为企业全面拥抱 AI 配备了数据存储、分析和应用的 “关键” 能力,帮助企业火箭式启动跑赢时代。
|
人工智能 监控 大数据
大数据未来五大趋势,这些变化你真的准备好了吗?
大数据未来五大趋势,这些变化你真的准备好了吗?
1169 90
|
12月前
|
存储 缓存 分布式计算
StarRocks x Iceberg:云原生湖仓分析技术揭秘与最佳实践
本文将深入探讨基于 StarRocks 和 Iceberg 构建的云原生湖仓分析技术,详细解析两者结合如何实现高效的查询性能优化。内容涵盖 StarRocks Lakehouse 架构、与 Iceberg 的性能协同、最佳实践应用以及未来的发展规划,为您提供全面的技术解读。 作者:杨关锁,北京镜舟科技研发工程师
StarRocks x Iceberg:云原生湖仓分析技术揭秘与最佳实践
|
存储 数据挖掘 Apache
Apache Doris + Iceberg 快速搭建指南|Lakehouse 使用手册(三)
如何在 Docker 环境下快速搭建 Apache Doris + Apache Iceberg 测试 & 演示环境,并展示各功能的使用操作
967 8
Apache Doris + Iceberg 快速搭建指南|Lakehouse 使用手册(三)
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
SQL 存储 关系型数据库