Hudi源码分析之使用Flink Table/SQL实现Hudi Sources

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文讲述Hudi源码分析之使用Flink Table/SQL实现Hudi Sources

在文章Flink Table/SQL自定义Sources和Sinks全解析(附代码)中我们说到在Flink Table/SQL中如何自定义Sources和Sinks,有了上述文章的基础,我们再来理解Flink Table/SQL是如何实现Hudi的数据读取与写入就比较容易了。

动态表是自定义sources/sinks的核心,通过查阅源码我们可以知道在flink-hudi子模块中,org.apache.hudi.table.HoodieTableFactory类同时实现了DynamicTableSourceFactoryDynamicTableSinkFactory两个接口,该类为提供特定于连接器的逻辑,也就是说当我们在flink sql中指定connector为hudi时,会走该处的逻辑

create table t1 (
  col bigint
) with (
  connector = "hudi",
  ...
)

HoodieTableFactory类的UML图如下所示

静态变量FACTORY_ID和factoryIdentifier()方法指定了该connector的标志为hudi

  public static final String FACTORY_ID = "hudi";
  
    @Override
  public String factoryIdentifier() {
    return FACTORY_ID;
  }

createDynamicTableSource()函数用于定义读取hudi数据源对应的HoodieTableSource,createDynamicTableSource()函数内容如下:

  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
    sanityCheck(conf, schema);
    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);

    Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
        new ValidationException("Option [path] should not be empty.")));
    return new HoodieTableSource(
        schema,
        path,
        context.getCatalogTable().getPartitionKeys(),
        conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
        conf);
  }

在createDynamicTableSource函数中,首先使用sanityCheck(conf, schema)对flink sql DDL中的schema和配置参数进行校验;然后使用flink sql DDL设置配置参数,比如说表名,主键等;最后将上一步得到的配置参数传给HoodieTableSource。

HoodieTableSource实现了ScanTableSource、SupportsPartitionPushDown、SupportsProjectionPushDown、SupportsLimitPushDown以及SupportsFilterPushDown。定义了读取Hudi表的方式,我们在使用flink读取hudi数据的时候,配置流读、批量读取、从某个时间点读取则是在此进行配置。

首先我们分析一下流读的场景,也就是在flink sql DDL中指定read.streaming.enabled = true

我们来看一下getScanRuntimeProvider函数,

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    return new DataStreamScanProvider() {

      @Override
      public boolean isBounded() {
        return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
      }

      @Override
      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        @SuppressWarnings("unchecked")
        TypeInformation<RowData> typeInfo =
            (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
        if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
          StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
              conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
          SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
              .setParallelism(1)
              .transform("split_reader", typeInfo, factory)
              .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
          return new DataStreamSource<>(source);
        } else {
          InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
          DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
          return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
        }
      }
    };
  }

当指定read.streaming.enabled = true时,该函数走流读逻辑,也就是

          StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
              conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
          SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
              .setParallelism(1)
              .transform("split_reader", typeInfo, factory)
              .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
          return new DataStreamSource<>(source);

在上述流读逻辑中,StreamReadMonitoringFunction类扩展了RichSourceFunction和实现了CheckpointedFunction,该类指定了hudi数据源读取。我们再来看一下StreamReadMonitoringFunction类中的monitorDirAndForwardSplits方法,该方法具体执行了hudi数据源的读取方式。其方式为增量分片读取。


  @Override
  public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) throws Exception {
    checkpointLock = context.getCheckpointLock();
    while (isRunning) {
      synchronized (checkpointLock) {
        monitorDirAndForwardSplits(context);
      }
      TimeUnit.SECONDS.sleep(interval);
    }
  }

  @VisibleForTesting
  public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
    HoodieTableMetaClient metaClient = getOrCreateMetaClient();
    if (metaClient == null) {
      // table does not exist
      return;
    }
    IncrementalInputSplits.Result result =
        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
    if (result.isEmpty()) {
      // no new instants, returns early
      return;
    }

    for (MergeOnReadInputSplit split : result.getInputSplits()) {
      context.collect(split);
    }
    // update the issues instant time
    this.issuedInstant = result.getEndInstant();
    LOG.info("\n"
            + "------------------------------------------------------------\n"
            + "---------- consumed to instant: {}\n"
            + "------------------------------------------------------------",
        this.issuedInstant);
  }

数据读取的时候也受checkpoint的影响,假如处于checkpoint,那么会停止读取直到chk结束,chk表示将当前读取的位置记录到状态中。由此我们可以知道当任务失败时,从上次chk点重启便可以从上次读取位置继续读取数据。

我们在定义hudi数据读取时,还可以定义read.start-commit,从某个commit开始消费数据,这些配置参数具体在这里进行处理

    IncrementalInputSplits.Result result =
        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);

具体函数如下:

/**
   * Returns the incremental input splits.
   *
   * @param metaClient    The meta client
   * @param hadoopConf    The hadoop configuration
   * @param issuedInstant The last issued instant, only valid in streaming read
   * @return The list of incremental input splits or empty if there are no new instants
   */
  public Result inputSplits(
      HoodieTableMetaClient metaClient,
      org.apache.hadoop.conf.Configuration hadoopConf,
      String issuedInstant) {
    metaClient.reloadActiveTimeline();
    HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
    if (commitTimeline.empty()) {
      LOG.warn("No splits found for the table under path " + path);
      return Result.EMPTY;
    }
    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
    // get the latest instant that satisfies condition
    final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
    final InstantRange instantRange;
    if (instantToIssue != null) {
      if (issuedInstant != null) {
        // the streaming reader may record the last issued instant, if the issued instant is present,
        // the instant range should be: (issued instant, the latest instant].
        instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(),
            InstantRange.RangeType.OPEN_CLOSE);
      } else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
        // first time consume and has a start commit
        final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
        instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
            ? null
            : InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
      } else {
        // first time consume and no start commit, consumes the latest incremental data set.
        instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
            InstantRange.RangeType.CLOSE_CLOSE);
      }
    } else {
      LOG.info("No new instant found for the table under path " + path + ", skip reading");
      return Result.EMPTY;
    }

    String tableName = conf.getString(FlinkOptions.TABLE_NAME);

    Set<String> writePartitions;
    final FileStatus[] fileStatuses;

    if (instantRange == null) {
      // reading from the earliest, scans the partitions and files directly.
      FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
      if (this.requiredPartitions != null) {
        // apply partition push down
        fileIndex.setPartitionPaths(this.requiredPartitions);
      }
      writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
      if (writePartitions.size() == 0) {
        LOG.warn("No partitions found for reading in user provided path.");
        return Result.EMPTY;
      }
      fileStatuses = fileIndex.getFilesInPartitions();
    } else {
      List<HoodieCommitMetadata> activeMetadataList = instants.stream()
          .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
      List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
      if (archivedMetadataList.size() > 0) {
        LOG.warn("\n"
            + "--------------------------------------------------------------------------------\n"
            + "---------- caution: the reader has fall behind too much from the writer,\n"
            + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
            + "--------------------------------------------------------------------------------");
      }
      List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
          // IMPORTANT: the merged metadata list must be in ascending order by instant time
          ? mergeList(archivedMetadataList, activeMetadataList)
          : activeMetadataList;

      writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
      // apply partition push down
      if (this.requiredPartitions != null) {
        writePartitions = writePartitions.stream()
            .filter(this.requiredPartitions::contains).collect(Collectors.toSet());
      }
      if (writePartitions.size() == 0) {
        LOG.warn("No partitions found for reading in user provided path.");
        return Result.EMPTY;
      }
      fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
    }

    if (fileStatuses.length == 0) {
      LOG.warn("No files found for reading in user provided path.");
      return Result.EMPTY;
    }

    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
    final String endInstant = instantToIssue.getTimestamp();
    final AtomicInteger cnt = new AtomicInteger(0);
    final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
    List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
        .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
            .map(fileSlice -> {
              Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
                  .sorted(HoodieLogFile.getLogFileComparator())
                  .map(logFile -> logFile.getPath().toString())
                  .collect(Collectors.toList()));
              String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
              return new MergeOnReadInputSplit(cnt.getAndAdd(1),
                  basePath, logPaths, endInstant,
                  metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
            }).collect(Collectors.toList()))
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
    return Result.instance(inputSplits, endInstant);
  }
  

当然,针对hudi数据源的读取方式是有很多种方式的,上述分析仅仅针对流读的简单分析,不过相信通过上述分析,也能够掌握其他读取方式的代码分析方法,并对问题分析起到一定的帮助。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
26天前
|
SQL 缓存 监控
14个Flink SQL性能优化实践分享
【7月更文挑战第12天】 1. **合理设置并行度**: 根据数据量和资源调整以提高处理速度. 2. **优化数据源**: 使用分区表并进行预处理减少输入量. 3. **数据缓存**: 采用 `BROADCAST` 或 `REPARTITION` 缓存常用数据. 4. **索引和分区**: 创建索引并按常用字段分区. 5. **避免不必要的计算**: 检查并移除多余的计算步骤. 6. **调整内存配置**: 分配足够内存避免性能下降. 7. **优化连接操作**: 选择适合大表和小表的连接方式. 8. **数据类型优化**: 选择合适类型以节省资源. ........
|
11天前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
31 6
|
11天前
|
SQL 大数据 测试技术
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【8月更文挑战第9天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法深受开发者青睐。本文分享了编写高效 Flink SQL 的实用技巧:首先需深刻理解数据特性与业务目标;其次,合理运用窗口函数(如 TUMBLE 和 HOP)可大幅提升效率;优化连接操作,优先采用等值连接并恰当选择连接表;正确选取数据类型以减少类型转换开销;最后,持续进行性能测试与调优。通过这些方法,我们能在实际项目中(如实时电商数据分析)更高效地处理数据,挖掘出更多价值。
29 6
|
11天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【8月更文挑战第9天】在大数据时代,Apache Flink以其强大的流处理能力脱颖而出,而Flink SQL则为数据处理带来了灵活性。本文介绍如何运用Flink SQL实现数据脱敏——一项关键的隐私保护技术。通过内置函数与表达式,在SQL查询中加入脱敏逻辑,可有效处理敏感信息,如个人身份与财务数据,以符合GDPR等数据保护法规。示例展示了如何对信用卡号进行脱敏,采用`CASE`语句检查并替换敏感数据。此外,Flink SQL支持自定义函数,适用于更复杂的脱敏需求。掌握此技能对于保障数据安全至关重要。
31 5
|
12天前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
39 1
|
14天前
|
SQL Java Scala
flink-cdc SQL Server op 字段如何获取?
Flink CDC 是 Apache Flink 的组件,用于捕获数据库变更事件。对 SQL Server,通过 Debezium 连接器支持变更数据捕获。`op` 字段标识操作类型(INSERT、UPDATE、DELETE)。配置包括添加依赖及设定 Source 连接器,可通过 Flink SQL 或 Java/Scala 完成。示例查询利用 `op` 字段筛选处理变更事件。
23 1
|
23天前
|
SQL 数据处理 Apache
Apache Flink SQL:实时计算的核心引擎
Apache Flink SQL 的一些核心功能,并探讨了其在实时计算领域的应用。随着 Flink 社区的不断发展和完善,Flink SQL 将变得越来越强大,为实时数据分析带来更多的可能性。
|
27天前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
45 13
|
24天前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
22天前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
34 6