flink 读取iceberg 表数据流程

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 如何读取iceberg 表数据,包括删除的数据文件如何合并

一.执行建立表语句

CREATE TABLE IF NOT EXISTS iceberg_hive.db1.sink6 (
  order_id INTEGER COMMENT 'unique id',
  `order_date` DATE ,
  `order_time` TIMESTAMP(3),
  `quantity` INT ,
  `product_id` INT ,
  `purchaser` STRING,
  primary key(order_id)  NOT ENFORCED
  ) with ('format-version' = '2')
  1. 获取catalog iceberg_hvie
  2. 从icebergCatalog cache 获取对应的表信息,org.apache.iceberg.BaseMetastoreCatalog.BaseMetastoreCatalogTableBuilder#create
publicTablecreate() {
TableOperationsops=BaseMetastoreCatalog.this.newTableOps(this.identifier);
if (ops.current() !=null) {
thrownewAlreadyExistsException("Table already exists: %s", newObject[]{this.identifier});
    } else {
StringbaseLocation=this.location!=null?this.location : BaseMetastoreCatalog.this.defaultWarehouseLocation(this.identifier);
Map<String, String>properties=this.propertiesBuilder.build();
TableMetadatametadata=TableMetadata.newTableMetadata(this.schema, this.spec, this.sortOrder, baseLocation, properties);
try {
ops.commit((TableMetadata)null, metadata);
        } catch (CommitFailedExceptionvar6) {
thrownewAlreadyExistsException("Table was created concurrently: %s", newObject[]{this.identifier});
        }
returnnewBaseTable(ops, BaseMetastoreCatalog.fullTableName(BaseMetastoreCatalog.this.name(), this.identifier));
    }
}
  1. 构建TableOperations(SPI interface to abstract table metadata access and updates.)
  2. TableOperations.current() 获取metadataLocation 位置并解析metadata.json


二.解析metadata.json

org.apache.iceberg.TableMetadataParser#fromJson(FileIO io, InputFile file, JsonNode node)

  1. JsonNode 是从metadata.json 获取到的数据流构建的json 信息

内容包括:

{
"format-version":2,
"table-uuid":"90dc63e6-7359-45f3-aa65-31bfb30d4a70",
"location":"hdfs://ns1/user/hive/warehouse/luna.db/sink6",
"last-sequence-number":2,
"last-updated-ms":1648709808166,
"last-column-id":6,
"current-schema-id":0,
"schemas":[
        {
"type":"struct",
"schema-id":0,
"identifier-field-ids":[
1            ],
"fields":[
                {
"id":1,
"name":"order_id",
"required":true,
"type":"int"                },
                {
"id":2,
"name":"order_date",
"required":false,
"type":"date"                                       },
                {
"id":3,
"name":"order_time",
"required":false,
"type":"timestamp"                },
                {
"id":4,
"name":"quantity",
"required":false,
"type":"int"                },
                {
"id":5,
"name":"product_id",
"required":false,
"type":"int"                },
                {
"id":6,
"name":"purchaser",
"required":false,
"type":"string"                }
            ]
        }
    ],
"default-spec-id":0,
"partition-specs":[
        {
"spec-id":0,
"fields":[
            ]
        }
    ],
"last-partition-id":999,
"default-sort-order-id":0,
"sort-orders":[
        {
"order-id":0,
"fields":[
            ]
        }
    ],
"properties":{
    },
"current-snapshot-id":6397021693615244286,
"snapshots":[
        {
"sequence-number":1,
"snapshot-id":586540949995254526,
"timestamp-ms":1648709717719,
"summary":{
"operation":"append",
"flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
"flink.max-committed-checkpoint-id":"1",
"added-data-files":"4",
"added-records":"10",
"added-files-size":"7569",
"changed-partition-count":"1",
"total-records":"10",
"total-files-size":"7569",
"total-data-files":"4",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"            },
"manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-586540949995254526-1-2d77afb5-1afa-4186-bb9a-796c33ecbf2b.avro",
"schema-id":0        },
        {
"sequence-number":2,
"snapshot-id":6397021693615244286,
"parent-snapshot-id":586540949995254526,
"timestamp-ms":1648709808166,
"summary":{
"operation":"overwrite",
"flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
"flink.max-committed-checkpoint-id":"4",
"added-data-files":"1",
"added-delete-files":"1",
"added-records":"1",
"added-files-size":"3474",
"added-equality-deletes":"1",
"changed-partition-count":"1",
"total-records":"11",
"total-files-size":"11043",
"total-data-files":"5",
"total-delete-files":"1",
"total-position-deletes":"0",
"total-equality-deletes":"1"            },
"manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-6397021693615244286-1-d3000ce2-fd59-4222-b8f5-6a9cc2a28399.avro",
"schema-id":0        }
    ],
"snapshot-log":[
        {
"timestamp-ms":1648709717719,
"snapshot-id":586540949995254526        },
        {
"timestamp-ms":1648709808166,
"snapshot-id":6397021693615244286        }
    ],
"metadata-log":[
        {
"timestamp-ms":1648709686421,
"metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00000-c9bcda85-ed5e-4582-89dd-c019b1771d1b.metadata.json"        },
        {
"timestamp-ms":1648709717719,
"metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00001-a369e769-6043-49ea-9051-f03f87ff51fe.metadata.json"        }
    ]
}
  1. 根据上面信息中的snapshot 节点构建BaseSnapshot ,从逻辑上讲当前构建TableMetadata 完成;
  2. 建立表完成,也就是表元数据刷新完成;


三:那BaseSnapshot 的dataManifests,deleteManifets 什么时候获取和设置呢

该部分内容主要在Iceberg 的FlinkInputFormat 的分片逻辑里面;

* 分配逻辑主要是在 org.apache.iceberg.flink.source.FlinkSplitGenerator#createInputSplits 中实现

大致逻辑:

(1) 构建TableScan(实现类DataTableScan -> BaseTableScan -> TableScan)

TableScan 定义:API for configuring a table scan.TableScan objects are immutable and can be shared between threads. Refinement methods, like select(Collection) and filter(Expression), create new TableScan instances.

(2) 通过org.apache.iceberg.DataTableScan#planFiles 读取manifestGroup,关注snapshot.dataManifests()===》从数据文件获取具体的文件信息;

@OverridepublicCloseableIterable<FileScanTask>planFiles(TableOperationsops, Snapshotsnapshot,
ExpressionrowFilter, booleanignoreResiduals,
booleancaseSensitive, booleancolStats) {
ManifestGroupmanifestGroup=newManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
      .caseSensitive(caseSensitive)
      .select(colStats?SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
      .filterData(rowFilter)
      .specsById(ops.current().specsById())
      .ignoreDeleted();
if (ignoreResiduals) {
manifestGroup=manifestGroup.ignoreResiduals();
  }
if (PLAN_SCANS_WITH_WORKER_POOL&&snapshot.dataManifests().size() >1) {
manifestGroup=manifestGroup.planWith(ThreadPools.getWorkerPool());
  }
returnmanifestGroup.planFiles();
}

(3) 在上面的代码中snapshot.dataManifests(), snapshot.deleteManifests() 这两处的调用会获取具体的manifest信息,具体获取信息的过程是在:org.apache.iceberg.BaseSnapshot#cacheManifests 中读取具体的manifestListLocation 获取manifest信息,其中判断文件的类型是通过文件的content 内容来判断是data 文件还是deletes 文件;


privatevoidcacheManifests() {
if (allManifests==null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the listthis.allManifests=ManifestLists.read(io.newInputFile(manifestListLocation));
  }
if (dataManifests==null||deleteManifests==null) {
this.dataManifests=ImmutableList.copyOf(Iterables.filter(allManifests,
manifest->manifest.content() ==ManifestContent.DATA));
this.deleteManifests=ImmutableList.copyOf(Iterables.filter(allManifests,
manifest->manifest.content() ==ManifestContent.DELETES));
  }
}

读取avro 文件获取到的manifestFile 的信息如下:

image.png

image.png


(4) 重点关注下manifestGroup.planFiles()

*  构建deleteFiles . DeleteFileIndex.build(); 读取的deleteFile 信息;判断是eqFilesSortedBySeq, posFilesSortedBySeq;

判断条件:.parquet文件的content 信息:

public enum FileContent {

DATA(0),

POSITION_DELETES(1),

EQUALITY_DELETES(2);

}

具体代码如下:

if (specsById.get(partition.first()).isUnpartitioned()) {
Preconditions.checkState(globalDeletes==null, "Detected multiple partition specs with no partitions");
List<Pair<Long, DeleteFile>>eqFilesSortedBySeq=deleteFilesByPartition.get(partition).stream()
      .filter(entry->entry.file().content() ==FileContent.EQUALITY_DELETES)
      .map(entry->// a delete file is indexed by the sequence number it should be applied toPair.of(entry.sequenceNumber() -1, entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());
globalApplySeqs=eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes=eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
List<Pair<Long, DeleteFile>>posFilesSortedBySeq=deleteFilesByPartition.get(partition).stream()
      .filter(entry->entry.file().content() ==FileContent.POSITION_DELETES)
      .map(entry->Pair.of(entry.sequenceNumber(), entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());
long[] seqs=posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files=posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
}

在equality_deletes 里 设定 的seqNum 是 entry.sequenceNumber() - 1;而position_deletes 里 设定的seqNum 是entry.sequenceNumber();原因是xxx

  • org.apache.iceberg.DeleteFileIndex#forDataFile(long sequenceNumber, DataFile file)

 (1) 获取分区对应的DeleteFile[];

 (2) 关联DataFile 对应的删除文件;

DeleteFile[] forDataFile(longsequenceNumber, DataFilefile) {
Pair<Integer, StructLikeWrapper>partition=partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]>partitionDeletes=sortedDeletesByPartition.get(partition);
Stream<DeleteFile>matchingDeletes;
if (partitionDeletes==null) {
matchingDeletes=limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
    } elseif (globalDeletes==null) {
matchingDeletes=limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
    } else {
matchingDeletes=Stream.concat(
limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
    }
returnmatchingDeletes        .filter(deleteFile->canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
        .toArray(DeleteFile[]::new);
  }

(3)判断DataFile 和 DeleteFile 是否存在关联org.apache.iceberg.DeleteFileIndex#canContainEqDeletesForFile

   * 获取DataFile 各个字段值的范围-DataFile 文件中记录了对应的信息(lowerBounds,upperBounds)

   * 获取DeleteFile 各个字段值的范围-DeleteFile 文件中记录了对应的信息(lowerBounds,upperBounds)

  *  比较deleteFile.equalityFieldIds() 获取到的字段在DeleteFile,DataFile 中的值的范围边界;   具体的比较逻辑参考:org.apache.iceberg.DeleteFileIndex#rangesOverlap(PrimitiveType type, ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf, ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf)

--根据字段的类型转换出对应的ByteBuffer的值;

-- 判断逻辑是否存在交集:comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;

(5)构建出BaseFileScanTask : 上面已经判断出对应的data 文件,和关联的Delete文件

private final DataFile file;
private final DeleteFile[] deletes;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;
private transient PartitionSpec spec = null;

image.png

(6)构建完成FlinkInputSplit

image.png

三:FlinkInputFormat 读取分片信息
(1) input读取到对应的分片:

public void open(FlinkInputSplit split)

主要的读取数据迭代逻辑在org.apache.iceberg.flink.source.DataIterator中:

(2)RowDataFileScanTaskReader

publicCloseableIterator<RowData>open(FileScanTasktask, InputFilesDecryptorinputFilesDecryptor) {
SchemapartitionSchema=TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds());
Map<Integer, ?>idToConstant=partitionSchema.columns().isEmpty() ?ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
RowDataFileScanTaskReader.FlinkDeleteFilterdeletes=newRowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor);
CloseableIterable<RowData>iterable=deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor));
if (!this.projectedSchema.sameSchema(deletes.requiredSchema())) {
RowDataProjectionrowDataProjection=RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct());
Objects.requireNonNull(rowDataProjection);
iterable=CloseableIterable.transform(iterable, rowDataProjection::wrap);
        }
returniterable.iterator();
    }

其中deletes.filter 对DataFile 数据进行过滤,删除DeleteFile 中的数据(疑惑点:如果DeleteFile 中的数据很大的处理逻辑是?)

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
75 5
|
5月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
|
3月前
|
SQL 分布式计算 BI
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用问题之如何在外部查询某个job中的表数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Kubernetes Java
实时计算 Flink版产品使用合集之遇到清空表数据遇到异常,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flinksql多流join,left右边任意一张表数据到后,是否都会更新test中对应的数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之是否可以使用 Iceberg 将数据写入 HDFS
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
53 2

热门文章

最新文章