一.执行建立表语句
CREATE TABLE IF NOT EXISTS iceberg_hive.db1.sink6 ( order_id INTEGER COMMENT 'unique id', `order_date` DATE , `order_time` TIMESTAMP(3), `quantity` INT , `product_id` INT , `purchaser` STRING, primary key(order_id) NOT ENFORCED ) with ('format-version' = '2')
- 获取catalog iceberg_hvie
- 从icebergCatalog cache 获取对应的表信息,org.apache.iceberg.BaseMetastoreCatalog.BaseMetastoreCatalogTableBuilder#create
publicTablecreate() { TableOperationsops=BaseMetastoreCatalog.this.newTableOps(this.identifier); if (ops.current() !=null) { thrownewAlreadyExistsException("Table already exists: %s", newObject[]{this.identifier}); } else { StringbaseLocation=this.location!=null?this.location : BaseMetastoreCatalog.this.defaultWarehouseLocation(this.identifier); Map<String, String>properties=this.propertiesBuilder.build(); TableMetadatametadata=TableMetadata.newTableMetadata(this.schema, this.spec, this.sortOrder, baseLocation, properties); try { ops.commit((TableMetadata)null, metadata); } catch (CommitFailedExceptionvar6) { thrownewAlreadyExistsException("Table was created concurrently: %s", newObject[]{this.identifier}); } returnnewBaseTable(ops, BaseMetastoreCatalog.fullTableName(BaseMetastoreCatalog.this.name(), this.identifier)); } }
- 构建TableOperations(SPI interface to abstract table metadata access and updates.)
- TableOperations.current() 获取metadataLocation 位置并解析metadata.json
二.解析metadata.json
org.apache.iceberg.TableMetadataParser#fromJson(FileIO io, InputFile file, JsonNode node)
- JsonNode 是从metadata.json 获取到的数据流构建的json 信息
内容包括:
{ "format-version":2, "table-uuid":"90dc63e6-7359-45f3-aa65-31bfb30d4a70", "location":"hdfs://ns1/user/hive/warehouse/luna.db/sink6", "last-sequence-number":2, "last-updated-ms":1648709808166, "last-column-id":6, "current-schema-id":0, "schemas":[ { "type":"struct", "schema-id":0, "identifier-field-ids":[ 1 ], "fields":[ { "id":1, "name":"order_id", "required":true, "type":"int" }, { "id":2, "name":"order_date", "required":false, "type":"date" }, { "id":3, "name":"order_time", "required":false, "type":"timestamp" }, { "id":4, "name":"quantity", "required":false, "type":"int" }, { "id":5, "name":"product_id", "required":false, "type":"int" }, { "id":6, "name":"purchaser", "required":false, "type":"string" } ] } ], "default-spec-id":0, "partition-specs":[ { "spec-id":0, "fields":[ ] } ], "last-partition-id":999, "default-sort-order-id":0, "sort-orders":[ { "order-id":0, "fields":[ ] } ], "properties":{ }, "current-snapshot-id":6397021693615244286, "snapshots":[ { "sequence-number":1, "snapshot-id":586540949995254526, "timestamp-ms":1648709717719, "summary":{ "operation":"append", "flink.job-id":"7e6f454848bd1f18edd776a4b5382547", "flink.max-committed-checkpoint-id":"1", "added-data-files":"4", "added-records":"10", "added-files-size":"7569", "changed-partition-count":"1", "total-records":"10", "total-files-size":"7569", "total-data-files":"4", "total-delete-files":"0", "total-position-deletes":"0", "total-equality-deletes":"0" }, "manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-586540949995254526-1-2d77afb5-1afa-4186-bb9a-796c33ecbf2b.avro", "schema-id":0 }, { "sequence-number":2, "snapshot-id":6397021693615244286, "parent-snapshot-id":586540949995254526, "timestamp-ms":1648709808166, "summary":{ "operation":"overwrite", "flink.job-id":"7e6f454848bd1f18edd776a4b5382547", "flink.max-committed-checkpoint-id":"4", "added-data-files":"1", "added-delete-files":"1", "added-records":"1", "added-files-size":"3474", "added-equality-deletes":"1", "changed-partition-count":"1", "total-records":"11", "total-files-size":"11043", "total-data-files":"5", "total-delete-files":"1", "total-position-deletes":"0", "total-equality-deletes":"1" }, "manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-6397021693615244286-1-d3000ce2-fd59-4222-b8f5-6a9cc2a28399.avro", "schema-id":0 } ], "snapshot-log":[ { "timestamp-ms":1648709717719, "snapshot-id":586540949995254526 }, { "timestamp-ms":1648709808166, "snapshot-id":6397021693615244286 } ], "metadata-log":[ { "timestamp-ms":1648709686421, "metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00000-c9bcda85-ed5e-4582-89dd-c019b1771d1b.metadata.json" }, { "timestamp-ms":1648709717719, "metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00001-a369e769-6043-49ea-9051-f03f87ff51fe.metadata.json" } ] }
- 根据上面信息中的snapshot 节点构建BaseSnapshot ,从逻辑上讲当前构建TableMetadata 完成;
- 建立表完成,也就是表元数据刷新完成;
三:那BaseSnapshot 的dataManifests,deleteManifets 什么时候获取和设置呢?
该部分内容主要在Iceberg 的FlinkInputFormat 的分片逻辑里面;
* 分配逻辑主要是在 org.apache.iceberg.flink.source.FlinkSplitGenerator#createInputSplits 中实现
大致逻辑:
(1) 构建TableScan(实现类DataTableScan -> BaseTableScan -> TableScan)
TableScan 定义:API for configuring a table scan.TableScan objects are immutable and can be shared between threads. Refinement methods, like select(Collection) and filter(Expression), create new TableScan instances.
(2) 通过org.apache.iceberg.DataTableScan#planFiles 读取manifestGroup,关注snapshot.dataManifests()===》从数据文件获取具体的文件信息;
publicCloseableIterable<FileScanTask>planFiles(TableOperationsops, Snapshotsnapshot, ExpressionrowFilter, booleanignoreResiduals, booleancaseSensitive, booleancolStats) { ManifestGroupmanifestGroup=newManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests()) .caseSensitive(caseSensitive) .select(colStats?SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(rowFilter) .specsById(ops.current().specsById()) .ignoreDeleted(); if (ignoreResiduals) { manifestGroup=manifestGroup.ignoreResiduals(); } if (PLAN_SCANS_WITH_WORKER_POOL&&snapshot.dataManifests().size() >1) { manifestGroup=manifestGroup.planWith(ThreadPools.getWorkerPool()); } returnmanifestGroup.planFiles(); }
(3) 在上面的代码中snapshot.dataManifests(), snapshot.deleteManifests() 这两处的调用会获取具体的manifest信息,具体获取信息的过程是在:org.apache.iceberg.BaseSnapshot#cacheManifests 中读取具体的manifestListLocation 获取manifest信息,其中判断文件的类型是通过文件的content 内容来判断是data 文件还是deletes 文件;
privatevoidcacheManifests() { if (allManifests==null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the listthis.allManifests=ManifestLists.read(io.newInputFile(manifestListLocation)); } if (dataManifests==null||deleteManifests==null) { this.dataManifests=ImmutableList.copyOf(Iterables.filter(allManifests, manifest->manifest.content() ==ManifestContent.DATA)); this.deleteManifests=ImmutableList.copyOf(Iterables.filter(allManifests, manifest->manifest.content() ==ManifestContent.DELETES)); } }
读取avro 文件获取到的manifestFile 的信息如下:
(4) 重点关注下manifestGroup.planFiles()
* 构建deleteFiles . DeleteFileIndex.build(); 读取的deleteFile 信息;判断是eqFilesSortedBySeq, posFilesSortedBySeq;
判断条件:.parquet文件的content 信息:
public enum FileContent {
DATA(0),
POSITION_DELETES(1),
EQUALITY_DELETES(2);
}
具体代码如下:
if (specsById.get(partition.first()).isUnpartitioned()) { Preconditions.checkState(globalDeletes==null, "Detected multiple partition specs with no partitions"); List<Pair<Long, DeleteFile>>eqFilesSortedBySeq=deleteFilesByPartition.get(partition).stream() .filter(entry->entry.file().content() ==FileContent.EQUALITY_DELETES) .map(entry->// a delete file is indexed by the sequence number it should be applied toPair.of(entry.sequenceNumber() -1, entry.file())) .sorted(Comparator.comparingLong(Pair::first)) .collect(Collectors.toList()); globalApplySeqs=eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray(); globalDeletes=eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); List<Pair<Long, DeleteFile>>posFilesSortedBySeq=deleteFilesByPartition.get(partition).stream() .filter(entry->entry.file().content() ==FileContent.POSITION_DELETES) .map(entry->Pair.of(entry.sequenceNumber(), entry.file())) .sorted(Comparator.comparingLong(Pair::first)) .collect(Collectors.toList()); long[] seqs=posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray(); DeleteFile[] files=posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); sortedDeletesByPartition.put(partition, Pair.of(seqs, files)); }
在equality_deletes 里 设定 的seqNum 是 entry.sequenceNumber() - 1;而position_deletes 里 设定的seqNum 是entry.sequenceNumber();原因是xxx
- org.apache.iceberg.DeleteFileIndex#forDataFile(long sequenceNumber, DataFile file)
(1) 获取分区对应的DeleteFile[];
(2) 关联DataFile 对应的删除文件;
DeleteFile[] forDataFile(longsequenceNumber, DataFilefile) { Pair<Integer, StructLikeWrapper>partition=partition(file.specId(), file.partition()); Pair<long[], DeleteFile[]>partitionDeletes=sortedDeletesByPartition.get(partition); Stream<DeleteFile>matchingDeletes; if (partitionDeletes==null) { matchingDeletes=limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes); } elseif (globalDeletes==null) { matchingDeletes=limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()); } else { matchingDeletes=Stream.concat( limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes), limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second())); } returnmatchingDeletes .filter(deleteFile->canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema())) .toArray(DeleteFile[]::new); }
(3)判断DataFile 和 DeleteFile 是否存在关联org.apache.iceberg.DeleteFileIndex#canContainEqDeletesForFile
* 获取DataFile 各个字段值的范围-DataFile 文件中记录了对应的信息(lowerBounds,upperBounds)
* 获取DeleteFile 各个字段值的范围-DeleteFile 文件中记录了对应的信息(lowerBounds,upperBounds)
* 比较deleteFile.equalityFieldIds() 获取到的字段在DeleteFile,DataFile 中的值的范围边界; 具体的比较逻辑参考:org.apache.iceberg.DeleteFileIndex#rangesOverlap(PrimitiveType type, ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf, ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf)
--根据字段的类型转换出对应的ByteBuffer的值;
-- 判断逻辑是否存在交集:comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;
(5)构建出BaseFileScanTask : 上面已经判断出对应的data 文件,和关联的Delete文件
private final DataFile file;
private final DeleteFile[] deletes;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;
private transient PartitionSpec spec = null;
(6)构建完成FlinkInputSplit
三:FlinkInputFormat 读取分片信息
(1) input读取到对应的分片:
public void open(FlinkInputSplit split)
主要的读取数据迭代逻辑在org.apache.iceberg.flink.source.DataIterator中:
(2)RowDataFileScanTaskReader
publicCloseableIterator<RowData>open(FileScanTasktask, InputFilesDecryptorinputFilesDecryptor) { SchemapartitionSchema=TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds()); Map<Integer, ?>idToConstant=partitionSchema.columns().isEmpty() ?ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); RowDataFileScanTaskReader.FlinkDeleteFilterdeletes=newRowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor); CloseableIterable<RowData>iterable=deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor)); if (!this.projectedSchema.sameSchema(deletes.requiredSchema())) { RowDataProjectionrowDataProjection=RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct()); Objects.requireNonNull(rowDataProjection); iterable=CloseableIterable.transform(iterable, rowDataProjection::wrap); } returniterable.iterator(); }
其中deletes.filter 对DataFile 数据进行过滤,删除DeleteFile 中的数据(疑惑点:如果DeleteFile 中的数据很大的处理逻辑是?)