一:背景
iceberg 支持对表结构的变更和分区的变更,比较好奇的是如果调整了分区之后,是否会改变数据的存储形式,以及对查询是否有影响;
我们先建一个表:
CREATE TABLE sample7 (
`id` BIGINT,
`data` STRING,
`name` STRING,
`category` STRING)
USING iceberg
PARTITIONED BY (category, name)
TBLPROPERTIES( "format-version"="2")
插入3条数据(2个分区[xc1,pt2] [xc3,pt3])
spark-sql> select * from sample7;
22/08/09 20:17:18 WARN conf.HiveConf: HiveConf of name hive.merge.mapfile does not exist
22/08/09 20:17:18 WARN conf.HiveConf: HiveConf hive.mapjoin.smalltable.filesize expects LONG type value
1 1 xc1 pt2
2 2 xc1 pt2
2 2 xc3 pt3
二:分区信息在元数据信息里面如何维护;
1):分区信息在iceberg snapshot.metadata.json 里如何维护
{ "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } }
2):manifest-list 如何维护分区信息
manifest-list.avro 中维护了多个manifest 信息,并列存储 ,每个对象为GenericManifestFile
{ "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/30045582-3715-4c5e-9dab-7ef15cd542cf-m0.avro", "manifest_length": 7099, "partition_spec_id": 0, "content": 0, "sequence_number": 2, "min_sequence_number": 2, "added_snapshot_id": 4976264316127381584, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 1, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt3" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc3" }, "upper_bound": { "bytes": "xc3" } } ] } } { "manifest_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/metadata/447e792c-8451-4665-807b-9107200abe47-m0.avro", "manifest_length": 7158, "partition_spec_id": 0, "content": 0, "sequence_number": 1, "min_sequence_number": 1, "added_snapshot_id": 3724364786505115700, "added_data_files_count": 2, "existing_data_files_count": 0, "deleted_data_files_count": 0, "added_rows_count": 3, "existing_rows_count": 0, "deleted_rows_count": 0, "partitions": { "array": [ { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "pt2" }, "upper_bound": { "bytes": "pt3" } }, { "contains_null": false, "contains_nan": { "boolean": false }, "lower_bound": { "bytes": "xc1" }, "upper_bound": { "bytes": "xc3" } } ] } }
3):manifest_path 如何维护分区信息
{ "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt2" }, "name": { "string": "xc1" } }, "record_count": 2, "file_size_in_bytes": 1254, "column_sizes": { "array": [ { "key": 1, "value": 55 }, { "key": 2, "value": 55 }, { "key": 3, "value": 97 }, { "key": 4, "value": 97 } ] }, "value_counts": { "array": [ { "key": 1, "value": 2 }, { "key": 2, "value": 2 }, { "key": 3, "value": 2 }, { "key": 4, "value": 2 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "1" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc1" }, { "key": 4, "value": "pt2" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } } { "status": 1, "snapshot_id": { "long": 3724364786505115700 }, "sequence_number": null, "data_file": { "content": 0, "file_path": "hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt3/name=xc3/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00002.parquet", "file_format": "PARQUET", "partition": { "category": { "string": "pt3" }, "name": { "string": "xc3" } }, "record_count": 1, "file_size_in_bytes": 1144, "column_sizes": { "array": [ { "key": 1, "value": 51 }, { "key": 2, "value": 51 }, { "key": 3, "value": 54 }, { "key": 4, "value": 54 } ] }, "value_counts": { "array": [ { "key": 1, "value": 1 }, { "key": 2, "value": 1 }, { "key": 3, "value": 1 }, { "key": 4, "value": 1 } ] }, "null_value_counts": { "array": [ { "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 } ] }, "nan_value_counts": { "array": [] }, "lower_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "upper_bounds": { "array": [ { "key": 1, "value": "\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000" }, { "key": 2, "value": "2" }, { "key": 3, "value": "xc3" }, { "key": 4, "value": "pt3" } ] }, "key_metadata": null, "split_offsets": { "array": [ 4 ] }, "equality_ids": null, "sort_order_id": { "int": 0 } } }
4):数据文件parquet 如何维护分区信息
creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) extra: iceberg.schema = {"type":"struct","schema-id":0, "fields": [{"id":1,"name":"id","required":false,"type":"long"}, {"id":2,"name":"data","required":false,"type":"string"}, {"id":3,"name":"name","required":false,"type":"string"}, {"id":4,"name":"category","required":false,"type":"string"}]} file schema: table -------------------------------------------------------------------------------- id: OPTIONAL INT64 R:0 D:1 data: OPTIONAL BINARY L:STRING R:0 D:1 name: OPTIONAL BINARY L:STRING R:0 D:1 category: OPTIONAL BINARY L:STRING R:0 D:1 row group 1: RC:2 TS:198 OFFSET:4 -------------------------------------------------------------------------------- id: INT64 GZIP DO:0 FPO:4 SZ:55/45/0.82 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] data: BINARY GZIP DO:0 FPO:59 SZ:55/39/0.71 VC:2 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 2, num_nulls: 0] name: BINARY GZIP DO:114 FPO:160 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: xc1, max: xc1, num_nulls: 0] category: BINARY GZIP DO:211 FPO:257 SZ:97/57/0.59 VC:2 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: pt2, max: pt2, num_nulls: 0]
三: flink 如何读取带分区的数据,如何过滤掉不在分区内的数据(高效读取)
* 构建IcebergTableSource (public class IcebergTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown, SupportsLimitPushDown )
看tablesource 的实现接口包括 投影下推,条件下推,limit 下推 目的是在构建读取的时候尽量减少对文件的scan,获取需要的数据的最小集
* 分片逻辑里面如何处理分区相关字段
对分区逻辑进行过滤的核心逻辑在
org.apache.iceberg.ManifestGroup#entries(java.util.function.BiFunction>,org.apache.iceberg.io.CloseableIterable>) 中;
特别关注其中的evalCache 的构造:
通过manifest 的分区信息(spec) + 过滤条件(dataFilter)构建出ManifestEvaluator;
比如分区name, 查询条件 id= 1 and name = 'xc';
会将条件name 作为分区的过滤判断条件;
* ManifestEvaluator
当前例子中的判断条件是eq
获取到对应的manifest统计信息根据条件表达式的逻辑判断是否符合目标,不符合就过滤掉即不需要再构建ManifestReader 去读取manifest 文件;
如果满足条件则根据当前manifest 信息读取manifest_path 构建ManifestReader;
根据条件信息(where id= 1 and name = 'xc')+ 分区列信息(spec)构建出分区过滤信息:lazyEvaluator
privateEvaluatorevaluator() { if (lazyEvaluator==null) { Expressionprojected=Projections.inclusive(spec, caseSensitive).project(rowFilter); ExpressionfinalPartFilter=Expressions.and(projected, partFilter); if (finalPartFilter!=null) { this.lazyEvaluator=newEvaluator(spec.partitionType(), finalPartFilter, caseSensitive); } else { this.lazyEvaluator=newEvaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive); } } returnlazyEvaluator; }
构建完成Evaluator 之后在迭代中调用具体的manifest 信息进行判断过滤
上面代码信息中
evaluator.eval(entry.file().partition()) 是用来进行分区过滤的;
metricsEvaluator.eval(entry.file())) 是用来根据条件对字段进行判断是否在对应的范围内;
其中分区的对比过滤逻辑是构建条件表达式,传入当前的manifest 中对应的data 的partition 值;
再根据 条件表达式中的操作符,比如例子中的op="EQ" 进行判断;将不符合的parquet 文件过滤掉;
回过头来看下mainfest 文件里面的一个parquet文件对应的信息
可以看到data_file 属性对应一个parquet文件,并且partition 信息也是具体的一个分区写入一个文件;
比如上面的category=pt2/name=xc1 分区对应一个数据文件:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample7/data/category=pt2/name=xc1/00000-6-b0abf41c-de98-4a9c-acb9-8523bd583025-00001.parquet
四: 分区变更(比如alter table sample8 add partition field name;)之后维护的分区信息如何变更;
CREATE TABLE sample8 (
`id` BIGINT,
`data` STRING,
`name` STRING,
`category` STRING)
USING iceberg
PARTITIONED BY (category)
TBLPROPERTIES( "format-version"="2")
初始分区为category;
后面调整:alter table sample8 add partition field name;
* 添加完分区之后metadata.json 里的partition 信息变更,但是不生成新的snapshot ==> 说明数据结构文件和manifest 文件都没有调整
* 分区信息读取的时候还是基于原来的avro 描述的分区进行判断(分区为category)
从数据结构上看,基于相同分区的数据会写入相同的目录和文件下(大数据量未测试过,但是猜测也是一个相对聚集的数据)所以在构建分区过滤上只是对更高效的条件进行过滤而已;
如上图manifest-list 信息所示:
包含三个部分manifest 文件内容:
1:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/fdce5159-40c5-4cc2-ae60-0317165a5576-m0.avro 两个分区
2:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m0.avro 两个分区
3:hdfs://ns1/dtInsight/hive/warehouse/zdk.db/sample8/metadata/a56dfef2-4592-4d1d-a642-e16f4467dac2-m1.avro 一个分区