在文章Flink Table/SQL自定义Sources和Sinks全解析(附代码)中我们说到在Flink Table/SQL中如何自定义Sources和Sinks,有了上述文章的基础,我们再来理解Flink Table/SQL是如何实现Hudi的数据读取与写入就比较容易了。
动态表是自定义sources/sinks的核心,通过查阅源码我们可以知道在flink-hudi子模块中,org.apache.hudi.table.HoodieTableFactory
类同时实现了DynamicTableSourceFactory
和DynamicTableSinkFactory
两个接口,该类为提供特定于连接器的逻辑,也就是说当我们在flink sql中指定connector为hudi时,会走该处的逻辑
create table t1 (
col bigint
) with (
connector = "hudi",
...
)
HoodieTableFactory
类的UML图如下所示
静态变量FACTORY_ID和factoryIdentifier()方法指定了该connector的标志为hudi
。
public static final String FACTORY_ID = "hudi";
@Override
public String factoryIdentifier() {
return FACTORY_ID;
}
createDynamicTableSource()函数用于定义读取hudi数据源对应的HoodieTableSource,createDynamicTableSource()函数内容如下:
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
return new HoodieTableSource(
schema,
path,
context.getCatalogTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
conf);
}
在createDynamicTableSource函数中,首先使用sanityCheck(conf, schema)对flink sql DDL中的schema和配置参数进行校验;然后使用flink sql DDL设置配置参数,比如说表名,主键等;最后将上一步得到的配置参数传给HoodieTableSource。
HoodieTableSource实现了ScanTableSource、SupportsPartitionPushDown、SupportsProjectionPushDown、SupportsLimitPushDown以及SupportsFilterPushDown。定义了读取Hudi表的方式,我们在使用flink读取hudi数据的时候,配置流读、批量读取、从某个时间点读取则是在此进行配置。
首先我们分析一下流读的场景,也就是在flink sql DDL中指定read.streaming.enabled = true
。
我们来看一下getScanRuntimeProvider函数,
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProvider() {
@Override
public boolean isBounded() {
return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
}
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
}
}
};
}
当指定read.streaming.enabled = true
时,该函数走流读逻辑,也就是
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
在上述流读逻辑中,StreamReadMonitoringFunction类扩展了RichSourceFunction和实现了CheckpointedFunction,该类指定了hudi数据源读取。我们再来看一下StreamReadMonitoringFunction类中的monitorDirAndForwardSplits方法,该方法具体执行了hudi数据源的读取方式。其方式为增量分片读取。
@Override
public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) throws Exception {
checkpointLock = context.getCheckpointLock();
while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(context);
}
TimeUnit.SECONDS.sleep(interval);
}
}
@VisibleForTesting
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
HoodieTableMetaClient metaClient = getOrCreateMetaClient();
if (metaClient == null) {
// table does not exist
return;
}
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
if (result.isEmpty()) {
// no new instants, returns early
return;
}
for (MergeOnReadInputSplit split : result.getInputSplits()) {
context.collect(split);
}
// update the issues instant time
this.issuedInstant = result.getEndInstant();
LOG.info("\n"
+ "------------------------------------------------------------\n"
+ "---------- consumed to instant: {}\n"
+ "------------------------------------------------------------",
this.issuedInstant);
}
数据读取的时候也受checkpoint的影响,假如处于checkpoint,那么会停止读取直到chk结束,chk表示将当前读取的位置记录到状态中。由此我们可以知道当任务失败时,从上次chk点重启便可以从上次读取位置继续读取数据。
我们在定义hudi数据读取时,还可以定义read.start-commit
,从某个commit开始消费数据,这些配置参数具体在这里进行处理
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
具体函数如下:
/**
* Returns the incremental input splits.
*
* @param metaClient The meta client
* @param hadoopConf The hadoop configuration
* @param issuedInstant The last issued instant, only valid in streaming read
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf,
String issuedInstant) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
}
List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
if (issuedInstant != null) {
// the streaming reader may record the last issued instant, if the issued instant is present,
// the instant range should be: (issued instant, the latest instant].
instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(),
InstantRange.RangeType.OPEN_CLOSE);
} else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT);
instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
: InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit, consumes the latest incremental data set.
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
Set<String> writePartitions;
final FileStatus[] fileStatuses;
if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
if (this.requiredPartitions != null) {
// apply partition push down
fileIndex.setPartitionPaths(this.requiredPartitions);
}
writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();
} else {
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;
writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
if (writePartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
}
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
}
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final String endInstant = instantToIssue.getTimestamp();
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, endInstant,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
return Result.instance(inputSplits, endInstant);
}
当然,针对hudi数据源的读取方式是有很多种方式的,上述分析仅仅针对流读的简单分析,不过相信通过上述分析,也能够掌握其他读取方式的代码分析方法,并对问题分析起到一定的帮助。