iceberg 分区是如何读写和维护

简介: 了解iceberg 分区的信息和数据维护,以及分区变更之后对数据读取到影响

一:背景

iceberg 支持对表结构的变更和分区的变更,比较好奇的是如果调整了分区之后,是否会改变数据的存储形式,以及对查询是否有影响;

我们先建一个表:

CREATE TABLE sample7 (

`id` BIGINT,

`data` STRING,

`name` STRING,

`category` STRING)

USING iceberg

PARTITIONED BY (category, name)

TBLPROPERTIES( "format-version"="2")

插入3条数据(2个分区[xc1,pt2] [xc3,pt3])

spark-sql> select * from sample7;

22/08/09 20:17:18 WARN conf.HiveConf: HiveConf of name hive.merge.mapfile does not exist

22/08/09 20:17:18 WARN conf.HiveConf: HiveConf hive.mapjoin.smalltable.filesize expects LONG type value

1 1 xc1 pt2

2 2 xc1 pt2

2 2 xc3 pt3


二:分区信息在元数据信息里面如何维护;

1):分区信息在iceberg snapshot.metadata.json 里如何维护

 { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } } 

2):manifest-list 如何维护分区信息

manifest-list.avro 中维护了多个manifest 信息,并列存储 ,每个对象为GenericManifestFile

 { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } } 

3):manifest_path 如何维护分区信息

 { "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt2" }, "name": { "string": "xc1" } }, "record_count": 2, "file_size_in_bytes": 1254, "column_sizes": { "array": [ { "key": 1, "value": 55 }, { "key": 2, "value": 55 }, { "key": 3, "value": 97 }, { "key": 4, "value": 97 } ] }, "value_counts": { "array": [ { "key": 1, "value": 2 }, { "key": 2, "value": 2 }, { "key": 3, "value": 2 }, { "key": 4, "value": 2 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "1" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } } { "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt3/name=xc3/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00002.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt3" }, "name": { "string": "xc3" } }, "record_count": 1, "file_size_in_bytes": 1144, "column_sizes": { "array": [ { "key": 1, "value": 51 }, { "key": 2, "value": 51 }, { "key": 3, "value": 54 }, { "key": 4, "value": 54 } ] }, "value_counts": { "array": [ { "key": 1, "value": 1 }, { "key": 2, "value": 1 }, { "key": 3, "value": 1 }, { "key": 4, "value": 1 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } } 

4):数据文件parquet 如何维护分区信息

 creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) extra: iceberg.schema = {"type":"struct","schema-id":0, "fields": [{"id":1,"name":"id","required":false,"type":"long"}, {"id":2,"name":"data","required":false,"type":"string"}, {"id":3,"name":"name","required":false,"type":"string"}, {"id":4,"name":"category","required":false,"type":"string"}]} file schema: table -------------------------------------------------------------------------------- id: OPTIONAL INT64 R:0 D:1 data: OPTIONAL BINARY L:STRING R:0 D:1 name: OPTIONAL BINARY L:STRING R:0 D:1 category: OPTIONAL BINARY L:STRING R:0 D:1 row group 1: RC:2 TS:198 OFFSET:4 -------------------------------------------------------------------------------- id: INT64 GZIP DO:0 FPO:4 SZ:55/45/0.82 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] data: BINARY GZIP DO:0 FPO:59 SZ:55/39/0.71 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] name: BINARY GZIP DO:114 FPO:160 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: xc1, max: xc1, num_nulls: 0] category: BINARY GZIP DO:211 FPO:257 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: pt2, max: pt2, num_nulls: 0] 

三: flink 如何读取带分区的数据,如何过滤掉不在分区内的数据(高效读取)

* 构建IcebergTableSource (public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown

看tablesource 的实现接口包括 投影下推,条件下推,limit 下推 目的是在构建读取的时候尽量减少对文件的scan,获取需要的数据的最小集

* 分片逻辑里面如何处理分区相关字段

image.png

对分区逻辑进行过滤的核心逻辑在

org.apache.iceberg.ManifestGroup#entries(java.util.function.BiFunction>,org.apache.iceberg.io.CloseableIterable>) 中;

image.png

特别关注其中的evalCache 的构造:

image.png

通过manifest 的分区信息(spec) + 过滤条件(dataFilter)构建出ManifestEvaluator;

比如分区name, 查询条件 id= 1 and name = 'xc';

会将条件name 作为分区的过滤判断条件;

* ManifestEvaluator

当前例子中的判断条件是eq

image.png

获取到对应的manifest统计信息根据条件表达式的逻辑判断是否符合目标,不符合就过滤掉即不需要再构建ManifestReader 去读取manifest 文件;

如果满足条件则根据当前manifest 信息读取manifest_path 构建ManifestReader;

image.png

根据条件信息(where id= 1 and name = 'xc')+ 分区列信息(spec)构建出分区过滤信息:lazyEvaluator

privateEvaluatorevaluator() {
if (lazyEvaluator==null) {
Expressionprojected=Projections.inclusive(spec, caseSensitive).project(rowFilter);
ExpressionfinalPartFilter=Expressions.and(projected, partFilter);
if (finalPartFilter!=null) {
this.lazyEvaluator=newEvaluator(spec.partitionType(), finalPartFilter, caseSensitive);
    } else {
this.lazyEvaluator=newEvaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
    }
  }
returnlazyEvaluator;
}

构建完成Evaluator 之后在迭代中调用具体的manifest 信息进行判断过滤


image.png

上面代码信息中

evaluator.eval(entry.file().partition()) 是用来进行分区过滤的;

metricsEvaluator.eval(entry.file())) 是用来根据条件对字段进行判断是否在对应的范围内;

其中分区的对比过滤逻辑是构建条件表达式,传入当前的manifest 中对应的data  的partition 值;

再根据 条件表达式中的操作符,比如例子中的op="EQ" 进行判断;将不符合的parquet 文件过滤掉;


回过头来看下mainfest 文件里面的一个parquet文件对应的信息

image.png

可以看到data_file 属性对应一个parquet文件,并且partition 信息也是具体的一个分区写入一个文件;

比如上面的category=pt2/name=xc1 分区对应一个数据文件:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet



四: 分区变更(比如alter table sample8 add partition field name;)之后维护的分区信息如何变更;

CREATE TABLE sample8 (

`id` BIGINT,

`data` STRING,

`name` STRING,

`category` STRING)

USING iceberg

PARTITIONED BY (category)

TBLPROPERTIES( "format-version"="2")

初始分区为category;

后面调整:alter table sample8 add partition field name;

* 添加完分区之后metadata.json 里的partition 信息变更,但是不生成新的snapshot ==> 说明数据结构文件和manifest 文件都没有调整

* 分区信息读取的时候还是基于原来的avro 描述的分区进行判断(分区为category)

从数据结构上看,基于相同分区的数据会写入相同的目录和文件下(大数据量未测试过,但是猜测也是一个相对聚集的数据)所以在构建分区过滤上只是对更高效的条件进行过滤而已;

image.png

如上图manifest-list 信息所示:

包含三个部分manifest 文件内容:

1:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/fdce5159-40c5-4cc2-ae60-0317165a5576-m0.avro 两个分区

2:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m0.avro 两个分区

3:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m1.avro 一个分区

相关文章
|
7月前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
282 0
|
SQL 存储 Oracle
线上数据问题排查案例分享-因为 HMS 和底层 orc 文件中某字段的数据精度不一致造成的数据丢失问题
线上数据问题排查案例分享-因为 HMS 和底层 orc 文件中某字段的数据精度不一致造成的数据丢失问题
|
4月前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
Doris动态分区表
Doris动态分区表 Doris动态分区表传参
|
2月前
|
调度
Doris给动态分区添加历史分区问题汇总
Doris动态分区表添加历史分区
|
7月前
|
流计算
Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
【1月更文挑战第24天】【1月更文挑战第117篇】Flink CDC里关于doris的动态分区问题,对以及建好的动态分区表,可以再次修改历史分区的保留时间嘛?
199 6
|
7月前
|
SQL 存储 传感器
Hive中的分区表和非分区表有什么区别?请解释其作用和使用场景。
Hive中的分区表和非分区表有什么区别?请解释其作用和使用场景。
245 0
|
7月前
|
SQL 存储 HIVE
Hive中的分桶表是什么?请解释其作用和使用场景。
Hive中的分桶表是什么?请解释其作用和使用场景。
254 0
|
SQL 调度 HIVE
flink 读取hudi 表元数据信息
flink 如何获取hudi 表的元数据信息
flink 读取hudi 表元数据信息
|
存储 缓存 负载均衡
Hbase的Rowkey设计以及如何进行预分区
今天有人问我Hbase的rowkey设计和预分区的问题,这篇文字就简单介绍一下.,关于Hbase的表的一些基本概念这里就不说了,直接说重点,尽可能说的简单一点,废话就不写了. 1.什么是Rowkey? 我们知道Hbase是一个分布式的、面向列的数据库,它和一般关系型数据库的最大区别是:HBase很适合于存储非结构化的数据,还有就是它基于列的而不是基于行的模式.
Hbase的Rowkey设计以及如何进行预分区