flink 读取iceberg 表数据流程

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 如何读取iceberg 表数据,包括删除的数据文件如何合并

一.执行建立表语句

CREATE TABLE IF NOT EXISTS iceberg_hive.db1.sink6 (
  order_id INTEGER COMMENT 'unique id',
  `order_date` DATE ,
  `order_time` TIMESTAMP(3),
  `quantity` INT ,
  `product_id` INT ,
  `purchaser` STRING,
  primary key(order_id)  NOT ENFORCED
  ) with ('format-version' = '2')
  1. 获取catalog iceberg_hvie
  2. 从icebergCatalog cache 获取对应的表信息,org.apache.iceberg.BaseMetastoreCatalog.BaseMetastoreCatalogTableBuilder#create
publicTablecreate() {
TableOperationsops=BaseMetastoreCatalog.this.newTableOps(this.identifier);
if (ops.current() !=null) {
thrownewAlreadyExistsException("Table already exists: %s", newObject[]{this.identifier});
    } else {
StringbaseLocation=this.location!=null?this.location : BaseMetastoreCatalog.this.defaultWarehouseLocation(this.identifier);
Map<String, String>properties=this.propertiesBuilder.build();
TableMetadatametadata=TableMetadata.newTableMetadata(this.schema, this.spec, this.sortOrder, baseLocation, properties);
try {
ops.commit((TableMetadata)null, metadata);
        } catch (CommitFailedExceptionvar6) {
thrownewAlreadyExistsException("Table was created concurrently: %s", newObject[]{this.identifier});
        }
returnnewBaseTable(ops, BaseMetastoreCatalog.fullTableName(BaseMetastoreCatalog.this.name(), this.identifier));
    }
}
  1. 构建TableOperations(SPI interface to abstract table metadata access and updates.)
  2. TableOperations.current() 获取metadataLocation 位置并解析metadata.json


二.解析metadata.json

org.apache.iceberg.TableMetadataParser#fromJson(FileIO io, InputFile file, JsonNode node)

  1. JsonNode 是从metadata.json 获取到的数据流构建的json 信息

内容包括:

{
"format-version":2,
"table-uuid":"90dc63e6-7359-45f3-aa65-31bfb30d4a70",
"location":"hdfs://ns1/user/hive/warehouse/luna.db/sink6",
"last-sequence-number":2,
"last-updated-ms":1648709808166,
"last-column-id":6,
"current-schema-id":0,
"schemas":[
        {
"type":"struct",
"schema-id":0,
"identifier-field-ids":[
1            ],
"fields":[
                {
"id":1,
"name":"order_id",
"required":true,
"type":"int"                },
                {
"id":2,
"name":"order_date",
"required":false,
"type":"date"                                       },
                {
"id":3,
"name":"order_time",
"required":false,
"type":"timestamp"                },
                {
"id":4,
"name":"quantity",
"required":false,
"type":"int"                },
                {
"id":5,
"name":"product_id",
"required":false,
"type":"int"                },
                {
"id":6,
"name":"purchaser",
"required":false,
"type":"string"                }
            ]
        }
    ],
"default-spec-id":0,
"partition-specs":[
        {
"spec-id":0,
"fields":[
            ]
        }
    ],
"last-partition-id":999,
"default-sort-order-id":0,
"sort-orders":[
        {
"order-id":0,
"fields":[
            ]
        }
    ],
"properties":{
    },
"current-snapshot-id":6397021693615244286,
"snapshots":[
        {
"sequence-number":1,
"snapshot-id":586540949995254526,
"timestamp-ms":1648709717719,
"summary":{
"operation":"append",
"flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
"flink.max-committed-checkpoint-id":"1",
"added-data-files":"4",
"added-records":"10",
"added-files-size":"7569",
"changed-partition-count":"1",
"total-records":"10",
"total-files-size":"7569",
"total-data-files":"4",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"            },
"manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-586540949995254526-1-2d77afb5-1afa-4186-bb9a-796c33ecbf2b.avro",
"schema-id":0        },
        {
"sequence-number":2,
"snapshot-id":6397021693615244286,
"parent-snapshot-id":586540949995254526,
"timestamp-ms":1648709808166,
"summary":{
"operation":"overwrite",
"flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
"flink.max-committed-checkpoint-id":"4",
"added-data-files":"1",
"added-delete-files":"1",
"added-records":"1",
"added-files-size":"3474",
"added-equality-deletes":"1",
"changed-partition-count":"1",
"total-records":"11",
"total-files-size":"11043",
"total-data-files":"5",
"total-delete-files":"1",
"total-position-deletes":"0",
"total-equality-deletes":"1"            },
"manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-6397021693615244286-1-d3000ce2-fd59-4222-b8f5-6a9cc2a28399.avro",
"schema-id":0        }
    ],
"snapshot-log":[
        {
"timestamp-ms":1648709717719,
"snapshot-id":586540949995254526        },
        {
"timestamp-ms":1648709808166,
"snapshot-id":6397021693615244286        }
    ],
"metadata-log":[
        {
"timestamp-ms":1648709686421,
"metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00000-c9bcda85-ed5e-4582-89dd-c019b1771d1b.metadata.json"        },
        {
"timestamp-ms":1648709717719,
"metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00001-a369e769-6043-49ea-9051-f03f87ff51fe.metadata.json"        }
    ]
}
  1. 根据上面信息中的snapshot 节点构建BaseSnapshot ,从逻辑上讲当前构建TableMetadata 完成;
  2. 建立表完成,也就是表元数据刷新完成;


三:那BaseSnapshot 的dataManifests,deleteManifets 什么时候获取和设置呢

该部分内容主要在Iceberg 的FlinkInputFormat 的分片逻辑里面;

* 分配逻辑主要是在 org.apache.iceberg.flink.source.FlinkSplitGenerator#createInputSplits 中实现

大致逻辑:

(1) 构建TableScan(实现类DataTableScan -> BaseTableScan -> TableScan)

TableScan 定义:API for configuring a table scan.TableScan objects are immutable and can be shared between threads. Refinement methods, like select(Collection) and filter(Expression), create new TableScan instances.

(2) 通过org.apache.iceberg.DataTableScan#planFiles 读取manifestGroup,关注snapshot.dataManifests()===》从数据文件获取具体的文件信息;

@OverridepublicCloseableIterable<FileScanTask>planFiles(TableOperationsops, Snapshotsnapshot,
ExpressionrowFilter, booleanignoreResiduals,
booleancaseSensitive, booleancolStats) {
ManifestGroupmanifestGroup=newManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
      .caseSensitive(caseSensitive)
      .select(colStats?SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
      .filterData(rowFilter)
      .specsById(ops.current().specsById())
      .ignoreDeleted();
if (ignoreResiduals) {
manifestGroup=manifestGroup.ignoreResiduals();
  }
if (PLAN_SCANS_WITH_WORKER_POOL&&snapshot.dataManifests().size() >1) {
manifestGroup=manifestGroup.planWith(ThreadPools.getWorkerPool());
  }
returnmanifestGroup.planFiles();
}

(3) 在上面的代码中snapshot.dataManifests(), snapshot.deleteManifests() 这两处的调用会获取具体的manifest信息,具体获取信息的过程是在:org.apache.iceberg.BaseSnapshot#cacheManifests 中读取具体的manifestListLocation 获取manifest信息,其中判断文件的类型是通过文件的content 内容来判断是data 文件还是deletes 文件;


privatevoidcacheManifests() {
if (allManifests==null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the listthis.allManifests=ManifestLists.read(io.newInputFile(manifestListLocation));
  }
if (dataManifests==null||deleteManifests==null) {
this.dataManifests=ImmutableList.copyOf(Iterables.filter(allManifests,
manifest->manifest.content() ==ManifestContent.DATA));
this.deleteManifests=ImmutableList.copyOf(Iterables.filter(allManifests,
manifest->manifest.content() ==ManifestContent.DELETES));
  }
}

读取avro 文件获取到的manifestFile 的信息如下:

image.png

image.png


(4) 重点关注下manifestGroup.planFiles()

*  构建deleteFiles . DeleteFileIndex.build(); 读取的deleteFile 信息;判断是eqFilesSortedBySeq, posFilesSortedBySeq;

判断条件:.parquet文件的content 信息:

public enum FileContent {

DATA(0),

POSITION_DELETES(1),

EQUALITY_DELETES(2);

}

具体代码如下:

if (specsById.get(partition.first()).isUnpartitioned()) {
Preconditions.checkState(globalDeletes==null, "Detected multiple partition specs with no partitions");
List<Pair<Long, DeleteFile>>eqFilesSortedBySeq=deleteFilesByPartition.get(partition).stream()
      .filter(entry->entry.file().content() ==FileContent.EQUALITY_DELETES)
      .map(entry->// a delete file is indexed by the sequence number it should be applied toPair.of(entry.sequenceNumber() -1, entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());
globalApplySeqs=eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes=eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
List<Pair<Long, DeleteFile>>posFilesSortedBySeq=deleteFilesByPartition.get(partition).stream()
      .filter(entry->entry.file().content() ==FileContent.POSITION_DELETES)
      .map(entry->Pair.of(entry.sequenceNumber(), entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());
long[] seqs=posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files=posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
}

在equality_deletes 里 设定 的seqNum 是 entry.sequenceNumber() - 1;而position_deletes 里 设定的seqNum 是entry.sequenceNumber();原因是xxx

  • org.apache.iceberg.DeleteFileIndex#forDataFile(long sequenceNumber, DataFile file)

 (1) 获取分区对应的DeleteFile[];

 (2) 关联DataFile 对应的删除文件;

DeleteFile[] forDataFile(longsequenceNumber, DataFilefile) {
Pair<Integer, StructLikeWrapper>partition=partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]>partitionDeletes=sortedDeletesByPartition.get(partition);
Stream<DeleteFile>matchingDeletes;
if (partitionDeletes==null) {
matchingDeletes=limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
    } elseif (globalDeletes==null) {
matchingDeletes=limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
    } else {
matchingDeletes=Stream.concat(
limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
    }
returnmatchingDeletes        .filter(deleteFile->canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
        .toArray(DeleteFile[]::new);
  }

(3)判断DataFile 和 DeleteFile 是否存在关联org.apache.iceberg.DeleteFileIndex#canContainEqDeletesForFile

   * 获取DataFile 各个字段值的范围-DataFile 文件中记录了对应的信息(lowerBounds,upperBounds)

   * 获取DeleteFile 各个字段值的范围-DeleteFile 文件中记录了对应的信息(lowerBounds,upperBounds)

  *  比较deleteFile.equalityFieldIds() 获取到的字段在DeleteFile,DataFile 中的值的范围边界;   具体的比较逻辑参考:org.apache.iceberg.DeleteFileIndex#rangesOverlap(PrimitiveType type, ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf, ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf)

--根据字段的类型转换出对应的ByteBuffer的值;

-- 判断逻辑是否存在交集:comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;

(5)构建出BaseFileScanTask : 上面已经判断出对应的data 文件,和关联的Delete文件

private final DataFile file;
private final DeleteFile[] deletes;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;
private transient PartitionSpec spec = null;

image.png

(6)构建完成FlinkInputSplit

image.png

三:FlinkInputFormat 读取分片信息
(1) input读取到对应的分片:

public void open(FlinkInputSplit split)

主要的读取数据迭代逻辑在org.apache.iceberg.flink.source.DataIterator中:

(2)RowDataFileScanTaskReader

publicCloseableIterator<RowData>open(FileScanTasktask, InputFilesDecryptorinputFilesDecryptor) {
SchemapartitionSchema=TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds());
Map<Integer, ?>idToConstant=partitionSchema.columns().isEmpty() ?ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
RowDataFileScanTaskReader.FlinkDeleteFilterdeletes=newRowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor);
CloseableIterable<RowData>iterable=deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor));
if (!this.projectedSchema.sameSchema(deletes.requiredSchema())) {
RowDataProjectionrowDataProjection=RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct());
Objects.requireNonNull(rowDataProjection);
iterable=CloseableIterable.transform(iterable, rowDataProjection::wrap);
        }
returniterable.iterator();
    }

其中deletes.filter 对DataFile 数据进行过滤,删除DeleteFile 中的数据(疑惑点:如果DeleteFile 中的数据很大的处理逻辑是?)

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
161 5
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
740 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 监控 Kafka
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
|
SQL 分布式计算 BI
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用问题之如何在外部查询某个job中的表数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL Kubernetes Java
实时计算 Flink版产品使用合集之遇到清空表数据遇到异常,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flinksql多流join,left右边任意一张表数据到后,是否都会更新test中对应的数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
170 1
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
216 2