开发者社区> 走刀口的生活> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

flink 读取iceberg 表数据流程

简介: flink 如何读取iceberg 表数据,包括删除的数据文件如何合并
+关注继续查看

一.执行建立表语句

CREATE TABLE IF NOT EXISTS iceberg_hive.db1.sink6 (
  order_id INTEGER COMMENT 'unique id',
  `order_date` DATE ,
  `order_time` TIMESTAMP(3),
  `quantity` INT ,
  `product_id` INT ,
  `purchaser` STRING,
  primary key(order_id)  NOT ENFORCED
  ) with ('format-version' = '2')
  1. 获取catalog iceberg_hvie
  2. 从icebergCatalog cache 获取对应的表信息,org.apache.iceberg.BaseMetastoreCatalog.BaseMetastoreCatalogTableBuilder#create
public Table create() {
    TableOperations ops = BaseMetastoreCatalog.this.newTableOps(this.identifier);
    if (ops.current() != null) {
        throw new AlreadyExistsException("Table already exists: %s", new Object[]{this.identifier});
    } else {
        String baseLocation = this.location != null ? this.location : BaseMetastoreCatalog.this.defaultWarehouseLocation(this.identifier);
        Map<String, String> properties = this.propertiesBuilder.build();
        TableMetadata metadata = TableMetadata.newTableMetadata(this.schema, this.spec, this.sortOrder, baseLocation, properties);
        
        try {
            ops.commit((TableMetadata)null, metadata);
        } catch (CommitFailedException var6) {
            throw new AlreadyExistsException("Table was created concurrently: %s", new Object[]{this.identifier});
        }
        
        return new BaseTable(ops, BaseMetastoreCatalog.fullTableName(BaseMetastoreCatalog.this.name(), this.identifier));
    }
}
  1. 构建TableOperations(SPI interface to abstract table metadata access and updates.)
  2. TableOperations.current() 获取metadataLocation 位置并解析metadata.json


二.解析metadata.json

org.apache.iceberg.TableMetadataParser#fromJson(FileIO io, InputFile file, JsonNode node)

  1. JsonNode 是从metadata.json 获取到的数据流构建的json 信息

内容包括:

{
    "format-version":2,
    "table-uuid":"90dc63e6-7359-45f3-aa65-31bfb30d4a70",
    "location":"hdfs://ns1/user/hive/warehouse/luna.db/sink6",
    "last-sequence-number":2,
    "last-updated-ms":1648709808166,
    "last-column-id":6,
    "current-schema-id":0,
    "schemas":[
        {
            "type":"struct",
            "schema-id":0,
            "identifier-field-ids":[
                1
            ],
            "fields":[
                {
                    "id":1,
                    "name":"order_id",
                    "required":true,
                    "type":"int"
                },
                {
                    "id":2,
                    "name":"order_date",
                    "required":false,
                    "type":"date"
                                       },
                {
                    "id":3,
                    "name":"order_time",
                    "required":false,
                    "type":"timestamp"
                },
                {
                    "id":4,
                    "name":"quantity",
                    "required":false,
                    "type":"int"
                },
                {
                    "id":5,
                    "name":"product_id",
                    "required":false,
                    "type":"int"
                },
                {
                    "id":6,
                    "name":"purchaser",
                    "required":false,
                    "type":"string"
                }
            ]
        }
    ],
    "default-spec-id":0,
    "partition-specs":[
        {
            "spec-id":0,
            "fields":[

            ]
        }
    ],
    "last-partition-id":999,
    "default-sort-order-id":0,
    "sort-orders":[
        {
            "order-id":0,
            "fields":[

            ]
        }
    ],
    "properties":{

    },
    "current-snapshot-id":6397021693615244286,
    "snapshots":[
        {
            "sequence-number":1,
            "snapshot-id":586540949995254526,
            "timestamp-ms":1648709717719,
            "summary":{
                "operation":"append",
                "flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
                "flink.max-committed-checkpoint-id":"1",
                "added-data-files":"4",
                "added-records":"10",
                "added-files-size":"7569",
                "changed-partition-count":"1",
                "total-records":"10",
                "total-files-size":"7569",
                "total-data-files":"4",
                "total-delete-files":"0",
                "total-position-deletes":"0",
                "total-equality-deletes":"0"
            },
            "manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-586540949995254526-1-2d77afb5-1afa-4186-bb9a-796c33ecbf2b.avro",
            "schema-id":0
        },
        {
            "sequence-number":2,
            "snapshot-id":6397021693615244286,
            "parent-snapshot-id":586540949995254526,
            "timestamp-ms":1648709808166,
            "summary":{
                "operation":"overwrite",
                "flink.job-id":"7e6f454848bd1f18edd776a4b5382547",
                "flink.max-committed-checkpoint-id":"4",
                "added-data-files":"1",
                "added-delete-files":"1",
                "added-records":"1",
                "added-files-size":"3474",
                "added-equality-deletes":"1",
                "changed-partition-count":"1",
                "total-records":"11",
                "total-files-size":"11043",
                "total-data-files":"5",
                "total-delete-files":"1",
                "total-position-deletes":"0",
                "total-equality-deletes":"1"
            },
            "manifest-list":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/snap-6397021693615244286-1-d3000ce2-fd59-4222-b8f5-6a9cc2a28399.avro",
            "schema-id":0
        }
    ],
    "snapshot-log":[
        {
            "timestamp-ms":1648709717719,
            "snapshot-id":586540949995254526
        },
        {
            "timestamp-ms":1648709808166,
            "snapshot-id":6397021693615244286
        }
    ],
    "metadata-log":[
        {
            "timestamp-ms":1648709686421,
            "metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00000-c9bcda85-ed5e-4582-89dd-c019b1771d1b.metadata.json"
        },
        {
            "timestamp-ms":1648709717719,
            "metadata-file":"hdfs://ns1/user/hive/warehouse/luna.db/sink6/metadata/00001-a369e769-6043-49ea-9051-f03f87ff51fe.metadata.json"
        }
    ]
}
  1. 根据上面信息中的snapshot 节点构建BaseSnapshot ,从逻辑上讲当前构建TableMetadata 完成;
  2. 建立表完成,也就是表元数据刷新完成;


三:那BaseSnapshot 的dataManifests,deleteManifets 什么时候获取和设置呢

该部分内容主要在Iceberg 的FlinkInputFormat 的分片逻辑里面;

* 分配逻辑主要是在 org.apache.iceberg.flink.source.FlinkSplitGenerator#createInputSplits 中实现

大致逻辑:

(1) 构建TableScan(实现类DataTableScan -> BaseTableScan -> TableScan)

TableScan 定义:API for configuring a table scan.TableScan objects are immutable and can be shared between threads. Refinement methods, like select(Collection) and filter(Expression), create new TableScan instances.

(2) 通过org.apache.iceberg.DataTableScan#planFiles 读取manifestGroup,关注snapshot.dataManifests()===》从数据文件获取具体的文件信息;

@Override
public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
                                                 Expression rowFilter, boolean ignoreResiduals,
                                                 boolean caseSensitive, boolean colStats) {
  ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
      .caseSensitive(caseSensitive)
      .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
      .filterData(rowFilter)
      .specsById(ops.current().specsById())
      .ignoreDeleted();

  if (ignoreResiduals) {
    manifestGroup = manifestGroup.ignoreResiduals();
  }

  if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
    manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
  }

  return manifestGroup.planFiles();
}

(3) 在上面的代码中snapshot.dataManifests(), snapshot.deleteManifests() 这两处的调用会获取具体的manifest信息,具体获取信息的过程是在:org.apache.iceberg.BaseSnapshot#cacheManifests 中读取具体的manifestListLocation 获取manifest信息,其中判断文件的类型是通过文件的content 内容来判断是data 文件还是deletes 文件;


private void cacheManifests() {
  if (allManifests == null) {
    // if manifests isn't set, then the snapshotFile is set and should be read to get the list
    this.allManifests = ManifestLists.read(io.newInputFile(manifestListLocation));
  }

  if (dataManifests == null || deleteManifests == null) {
    this.dataManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
        manifest -> manifest.content() == ManifestContent.DATA));
    this.deleteManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
        manifest -> manifest.content() == ManifestContent.DELETES));
  }
}

读取avro 文件获取到的manifestFile 的信息如下:

image.png

image.png


(4) 重点关注下manifestGroup.planFiles()

* 构建deleteFiles . DeleteFileIndex.build(); 读取的deleteFile 信息;判断是eqFilesSortedBySeq, posFilesSortedBySeq;

判断条件:.parquet文件的content 信息:

public enum FileContent {

DATA(0),

POSITION_DELETES(1),

EQUALITY_DELETES(2);

}

具体代码如下:

if (specsById.get(partition.first()).isUnpartitioned()) {
  Preconditions.checkState(globalDeletes == null, "Detected multiple partition specs with no partitions");

  List<Pair<Long, DeleteFile>> eqFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
      .filter(entry -> entry.file().content() == FileContent.EQUALITY_DELETES)
      .map(entry ->
          // a delete file is indexed by the sequence number it should be applied to
          Pair.of(entry.sequenceNumber() - 1, entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());

  globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
  globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

  List<Pair<Long, DeleteFile>> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
      .filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
      .map(entry -> Pair.of(entry.sequenceNumber(), entry.file()))
      .sorted(Comparator.comparingLong(Pair::first))
      .collect(Collectors.toList());

  long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
  DeleteFile[] files = posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

  sortedDeletesByPartition.put(partition, Pair.of(seqs, files));

}

在equality_deletes 里 设定 的seqNum 是 entry.sequenceNumber() - 1;而position_deletes 里 设定的seqNum 是entry.sequenceNumber();原因是xxx

  • org.apache.iceberg.DeleteFileIndex#forDataFile(long sequenceNumber, DataFile file)

(1) 获取分区对应的DeleteFile[];

(2) 关联DataFile 对应的删除文件;

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
    Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
    Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);

    Stream<DeleteFile> matchingDeletes;
    if (partitionDeletes == null) {
      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
    } else if (globalDeletes == null) {
      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
    } else {
      matchingDeletes = Stream.concat(
          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
    }

    return matchingDeletes
        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
        .toArray(DeleteFile[]::new);
  }

(3)判断DataFile 和 DeleteFile 是否存在关联org.apache.iceberg.DeleteFileIndex#canContainEqDeletesForFile

* 获取DataFile 各个字段值的范围-DataFile 文件中记录了对应的信息(lowerBounds,upperBounds)

* 获取DeleteFile 各个字段值的范围-DeleteFile 文件中记录了对应的信息(lowerBounds,upperBounds)

* 比较deleteFile.equalityFieldIds() 获取到的字段在DeleteFile,DataFile 中的值的范围边界; 具体的比较逻辑参考:org.apache.iceberg.DeleteFileIndex#rangesOverlap(PrimitiveType type, ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf, ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf)

--根据字段的类型转换出对应的ByteBuffer的值;

-- 判断逻辑是否存在交集:comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;

(5)构建出BaseFileScanTask : 上面已经判断出对应的data 文件,和关联的Delete文件

private final DataFile file;
private final DeleteFile[] deletes;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;
private transient PartitionSpec spec = null;

image.png

(6)构建完成FlinkInputSplit

image.png

三:FlinkInputFormat 读取分片信息
(1) input读取到对应的分片:

public void open(FlinkInputSplit split)

主要的读取数据迭代逻辑在org.apache.iceberg.flink.source.DataIterator中:

(2)RowDataFileScanTaskReader

public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
        Schema partitionSchema = TypeUtil.select(this.projectedSchema, task.spec().identitySourceIds());
        Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
        RowDataFileScanTaskReader.FlinkDeleteFilter deletes = new RowDataFileScanTaskReader.FlinkDeleteFilter(task, this.tableSchema, this.projectedSchema, inputFilesDecryptor);
        CloseableIterable<RowData> iterable = deletes.filter(this.newIterable(task, deletes.requiredSchema(), (Map)idToConstant, inputFilesDecryptor));
        if (!this.projectedSchema.sameSchema(deletes.requiredSchema())) {
            RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredRowType(), deletes.requiredSchema().asStruct(), this.projectedSchema.asStruct());
            Objects.requireNonNull(rowDataProjection);
            iterable = CloseableIterable.transform(iterable, rowDataProjection::wrap);
        }

        return iterable.iterator();
    }

其中deletes.filter 对DataFile 数据进行过滤,删除DeleteFile 中的数据(疑惑点:如果DeleteFile 中的数据很大的处理逻辑是?)

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SQL如何制作带农历的日期维度表(上)
农历与世界通用的日历有所区别,是科学家演算出来的,目前为止只有到2049年的,以后的有了还可以加入! 所以我们可以把已经演算出来的具体农历制作成一张表,通过调用当前的日期来返回具体的农历。
39 0
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖
本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。
1837 0
Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT)
Spring Batch:文件的批量读写Flatfile(XML,CSV,TXT)
199 0
Flink + Iceberg 在去哪儿的实时数仓实践
本文介绍去哪儿数据平台在使用 Flink + Iceberg 0.11 的一些实践。
1655 0
Flink集成Iceberg在同程艺龙的实践
本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践。
3251 0
Flink + Iceberg 全场景实时数仓的建设实践
Apache Flink 是目前大数据领域非常流行的流批统一的计算引擎,数据湖是顺应云时代发展潮流的新型技术架构,以 Iceberg、Hudi、Delta 为代表的解决方案应运而生,Iceberg 目前支持 Flink 通过 DataStream API /Table API 将数据写入 Iceberg 的表,并提供对 Apache Flink 1.11.x 的集成支持。
3732 0
多线程下不重复读取SQL Server 表的数据
在进行一些如发送短信、邮件的业务时,我们经常会使用一个表来存储待发送的数据,由后台多个线程不断的从表中读取待发送的数据进行发送,发送完成后再将数据转移到历史表中,这样保证待发送表的数据一般情况下不会太多。
682 0
5
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载