1.前言
Flink是流计算领域的佼佼者,Hudi在数据湖领域的热度一直很高,两者的结合也是很多公司的选择。如果要深入了解Flink + Hudi技术的应用或者性能调优,那么了解源码中的原理会对我们有很大的帮助,本文主要围绕着Flink对Hudi的写入流程进行分析,从而去理解Hudi中的各种核心概念,像Copy-on-Write(COW)、Merge-on-Read(MOR)、File Layouts(文件布局)、Timeline(时间线)等,本文默认大家对这些概念有所了解,在文章中的代码分析时会涉及到这些概念将不会进行详细的讲解。
2.源码阅读demo
源码阅读最重要的是可以进行断点跟读,而一般最简单的demo代码能够提高源码阅读的效果,可以通过下面的demo代码来观察Flink写入Hudi的完整过程
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
// 为了更方便观察源码过程,这里将Bucket的缓存大小设置为0.5kb
options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.0005");
// 为了更方便观察源码过程,这里将完整流程中各任务的并行度设置为1
options.put(FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), "1");
options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "1");
options.put(FlinkOptions.WRITE_TASKS.key(), "1");
options.put(FlinkOptions.COMPACTION_TASKS.key(), "1");
// 为了更方便观察源码过程,将compact的触发机制设置为间隔2次delta_commit
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), FlinkOptions.NUM_COMMITS);
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(20)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts INT")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
builder.sink(dataStream, false);
env.execute("HudiWriteDemoWithFlink");
3.Flink写入过程的完整流程介绍
Flink写入外部存储的接口是DynamicTableSink,Hudi通过HoodieTableSink来实现Flink的写入接口,核心的写入逻辑位于getSinkRuntimeProvider,由于篇幅的影响,本文只要会在每个环节中选择一条链路进行讲解。
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProviderAdapter) dataStream -> {
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
// 设置Hudi的instant commit的超时时间为Flink的checkpoint超时时间,因为Flink将instant的commit过程放在checkpoint的完成时执行
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());
RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();
// 当写入操作类型为bulk_insert的时候会进行到这个分支,本文主要介绍Flink的流写过程,我们采用的写入操作类型upsert
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
return Pipelines.bulkInsert(conf, rowType, dataStream);
}
// 当操作类型为insert的时候,对于mor类型的表,数据的写入为append的方式,对于cow的类型的表,只有采用write.insert.cluster=flase的时候才能采用append的方式,当write.insert.cluster=true的时候,在写入数据的时候会对小文件进行merge的操作,这个分支也不是本次讲解的路径
if (OptionsResolver.isAppendMode(conf)) {
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
}
DataStream<Object> pipeline;
// 从这里开始是本文分析等核心起点,这里是将数据源生成HoodieRecord DataStream的过程,默认会进行key和fileid索引的加载过程,Flink在这个写入过程中没有去加载文件的index,默认会将文件中的key和fileid对读取出来下发到下游,利用Flink的状态机制进行保存
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// Flink将生成的HoodieRecord DataStream持久化,在持久化的过程中,涉及到instant的commit过程,当写入任务的所有并行度完成写持久化操作后,会将写入结果上报到OperatorCoordinator,最后在checkpoint完成的时候,会通知OperatorCoordinator进行instant的commit过程和开启下一阶段instant的requested过程
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// 当表类型为mor时,同时设置了异步compact,则会在Flink的DAG中增加任务来处理compact的过程
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
}
};
完整的主要流程可以通过下图来表示,其中各环节中的具体步骤将在后面环节进行讲解
4.HoodieRecord DataStream生成过程详解
HoodieRecord DataStream生成过程的详细代码为
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
RowType rowType,
DataStream<RowData> dataStream,
boolean bounded,
boolean overwrite) {
// 是否更新旧分区路径的索引
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
// 当overwrite或者索引类型为bucket时,这一步这一步只是做DataStream转化操作
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
// 当满足批任务、允许多分区存在相同key、分区表时,
return boundedBootstrap(conf, rowType, dataStream);
} else {
// 对于其他类型,flink利用flink的state特性加载旧分区的索引信息,这一部份是本文的核心分析,详细分析看4.2部分
return streamBootstrap(conf, rowType, dataStream, bounded);
}
}
4.1 rowDataToHoodieRecord
在RowDataToHoodieFunction算子中实现,核心是将RowData转化成HoodieAvroRecord,HoodieAvroRecord的主要成员如下:
//主要包括数据的recordKey及分区信息partitionPath,根据建表信息从原始数据中提取
HoodieKey key;
//原始数据,包含输入的所有字段
T data;
//数据在Hudi中的操作类型,描述在changelog
HoodieOperation operation;
//数据对应在Hudi中的文件,这一步该值为null,在进行2.2步骤后会进行填充
HoodieRecordLocation currentLocation;
4.2 BootstrapOperator
核心代码如下:
private static DataStream<HoodieRecord> streamBootstrap(
Configuration conf,
RowType rowType,
DataStream<RowData> dataStream,
boolean bounded) {
// 对DataStream进行转化成HoodieRecord封装类
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
// 当设置启动加载状态或者处理批任务时,会将历史数据的索引信息加载到flink下游算子的状态中
dataStream1 = dataStream1
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
.uid(opUID("index_bootstrap", conf));
}
return dataStream1;
}
BootstrapOperator的作用是将Hudi表中已经持久化的HoodieKey和对应的文件FileSlice信息加载到下游算子(BucketAssignFunction)的状态中,供新增数据查找bucket使用。核心实现逻辑在initializeState方法中,首先拉取到Hudi表目录下的所有分区路径,然后根据分区路径进行正则匹配,匹配上的就加载该分区下的所有的HoodieKey,加载的时候没有使用文件的Index,而是通过读取分区下的所有文件的key和partitionPath,具体实现可以对loadRecords方法进行跟读。
Flink天然支持状态,所以能够将Hudi的HoodieKey和FileSlice关系对保存在状态中供新增数据进行查找,而不是像Spark那样通过加载表的Index来进行查找,这样可以复用Flink的状态管理机制及Checkpoint机制,不用用户再进行Index加载和由此产生的内存管理等复杂操作了,同时,Hudi采用的Index是有可能存在假阳性的可能,而通过状态的方式,可以保证下游算子中的状态都是准确的。
5.HoodieRecord DataStream流写过程详解
写入的完整过程的代码如下:
public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
// 当Hudi的索引类型采用bucket的时候,Flink任务会缓存分区/bucket/fileId映射关系,然后根据数据通过内存中的映射关系找到对应文件
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("bucket_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
// 其他类型将会进行这一步
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// 按key进行分组,避免多任务在同一时间对一个桶进行写入操作
.keyBy(HoodieRecord::getRecordKey)
// 再对数据进行bucket分配操作,这里会使用到启动时加载的状态(如果设置了启动加载)以及增量数据更新的状态信息
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid(opUID("bucket_assigner", conf))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// 根据fileId进行分组,保证同一个文件的写入在同一个任务里
.keyBy(record -> record.getCurrentLocation().getFileId())
// 执行数据写入文件操作,这一步将是后面核心分析的重点
.transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("stream_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
5.1 分区下数据的bucket分配
核心类为 BucketAssignFunction,核心处理代码如下所示:
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
...
// 获取算子状态中的索引
HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
// 当操作类型是UPSERT/UPSERT_PREPPED/DELETE,同时状态有定义的时候,会从状态中查找location(fileId),并且将instant time标记为‘U’,表明是一个更新bucket操作
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
// 当与旧分区不同,则表示key跨分区了,则会根据配置进行后面的操作
if (globalIndex) {
// 当配置需要更新旧分区索引的时候,则在旧分区需要新增一条删除的记录
HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.unseal();
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
// 在新分区新一条插入记录
location = getNewRecordLocation(partitionPath);
} else {
// 没有跨分区,则直接返回状态中的索引
location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
// 新增一条索引信息
location = getNewRecordLocation(partitionPath);
}
if (isChangingRecords) {
// 更新状态里的索引
updateIndexState(partitionPath, location);
}
record.unseal();
record.setCurrentLocation(location);
record.seal();
out.collect((O) record);
}
Hudi对于相同key的跨分区行为,当配置了需要更新旧分区的索引时,会对旧分区中的数据采用删除操作,在新分区中保留最新的数据,从而做到全局唯一。
5.2 执行hudi的写入操作
Flink进行Hudi的写入func是StreamWriteFunction,由于运行到这一步已经知道了数据需要写入到哪个fileId了,所以这一步只需要做到常规的持久化操作,本地内存缓存 -> flush磁盘,核心代码如下所示
protected void bufferRecord(HoodieRecord<?> value) {
final String bucketID = getBucketID(value);
// 从缓存中找到对应的bucket,如果缓存中没有则新建一个bucket
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
final DataItem item = DataItem.fromHoodieRecord(value);
bucket.records.add(item);
// 判断当前bucket是否达到写入的最小size
boolean flushBucket = bucket.detector.detect(item);
// 判断当前算子中所有bucket的数据量是否达到了写入的size
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
// flush当前bucket
if (flushBucket(bucket)) {
this.tracer.countDown(bucket.detector.totalSize);
bucket.reset();
}
} else if (flushBuffer) {
// 找到算子中最大的bucket进行flush
DataBucket bucketToFlush = this.buckets.values().stream()
.max(Comparator.comparingLong(b -> b.detector.totalSize))
.orElseThrow(NoSuchElementException::new);
if (flushBucket(bucketToFlush)) {
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
} else {
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
}
}
}
这里的bucket大小评估采用的抽样1%进行,当流量大或者数据大小不稳定的情况下,可以调抽样比例。
执行刷盘的过程核心代码
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
if (instant == null) {
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
// 在insert/upsert前执行去重操作
records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
.deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger);
}
bucket.preWrite(records);
// 将bucket执行flush磁盘操作,这一部份属于Hudi的写入过程,后期将新开一节进行分析
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
records.clear();
// 记录bucket刷写磁盘结果
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.build();
// 将bucket刷写磁盘的结果同步到算子的coordinator
this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true;
}
至此,Flink对Hudi的写入已经分析完了,数据已经flush到了磁盘了,那下游对数据的可见性、Hudi的写入事务性是在哪里体现的呢?
首先,Flink对于Hudi刷写磁盘的结果会同步到算子的coordinator,后续会通过coordinator来进行commit操作,这一步类似于kafka的两阶段提交,数据会先刷到存储磁盘,之后再统一提交commit操作,核心代码如下:
private void handleWriteMetaEvent(WriteMetadataEvent event) {
// 存在task在完成ckp后,再flush缓存中的数据,所以此时的instant会小于当前的instant
ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
addEventToBuffer(event);
}
private void addEventToBuffer(WriteMetadataEvent event) {
// 来自算子各task的flush结果,都会缓存在coordinator的eventBuffer中
if (this.eventBuffer[event.getTaskID()] != null) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else {
this.eventBuffer[event.getTaskID()] = event;
}
}
上面的步骤会将这一次ckp中间各任务提交的flush结果发送给到coordinator,coodinator会将结果缓存在本地。
当所有任务完成了ckp后,coodinator会接收到ckp完成的通知,则会进行flush后的commit操作,核心代码如下:
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// 开始进行commit操作
final boolean committed = commitInstant(this.instant, checkpointId);
if (tableState.scheduleCompaction) {
// 如果设置异步compaction操作,则会在这儿进行plan
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}
if (tableState.scheduleClustering) {
// 如果设置异步cluster操作,则会在这儿进行调度
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
}
if (committed) {
// 开启新一轮instant
startInstant();
// 同步hive元数据
syncHiveAsync();
}
}, "commits the instant %s", this.instant
);
}
详细的提交逻辑在commitInstant方法中,核心逻辑是将当前instant中发生变更的文件写入到对应的commit文件中,并重置eventBuffer,至此,数据已经完成提交,下游对数据已经可见了。
完整的写入及提交逻辑如下图所示:
这个写入过程很长,对于Flink而言,我们一般会要求具有Exactly-once语义,那么上述过程是是否能做到Exactly-once语义的?对于Hudi而言,在CAP上的考量又是怎样的?
6.Hudi文件compact过程详解
6.1 Compact plan的生成
Hudi的compact过程分为同步和异步,同步对性能影响很大,本节将重点对Hudi的异步compact进行解析,在上一章节,当配置了异步compact的时候,Flink会在完成commit后生成compact的plan,核心代码逻辑如下
public static void scheduleCompaction(
HoodieTableMetaClient metaClient,
HoodieFlinkWriteClient<?> writeClient,
boolean deltaTimeCompaction,
boolean committed) {
if (committed) {
// 当完成了提交后,会生成compact的plan
writeClient.scheduleCompaction(Option.empty());
} else if (deltaTimeCompaction) {
// 当配置了基于时间间隔调度compact的话,不管是否完成commit也会进行plan的生成
metaClient.reloadActiveTimeline();
Option<String> compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTime.isPresent()) {
writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(), Option.empty());
}
}
}
我们核心跟scheduleCompaction方法,最终会进入到Table的scheduleCompaction方法,我们这里选择HoodieFlinkMergeOnReadTable进行跟读,首先会生成产出plan的执行器ScheduleCompactionActionExecutor,这个执行器会初始化一个Compact plan的生成器,也就是BaseHoodieCompactionPlanGenerator,最终根据执行器根据compact策略判断是否需要进行compact后,然后调用plan生成器,产出Compact的执行plan,核心代码如下
public Option<HoodieCompactionPlan> execute() {
...
//
HoodieCompactionPlan plan = scheduleCompaction();
Option<HoodieCompactionPlan> option = Option.empty();
if (plan != null && nonEmpty(plan.getOperations())) {
...
try {
if (operationType.equals(WriteOperationType.COMPACT)) {
// 持久化Hudi的数据文件的compact plan
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, instantTime);
// 将生成的plan序列化并持久化到Hudi表目录下
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} else {
// 持久化Hudi的log文件的compact plan
HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
table.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
}
} catch (IOException ioe) {
...
}
option = Option.of(plan);
}
return option;
}
继续跟着scheduleCompaction方法解析会发现,首先会去拉取最近的DeltaCommit信息,然后根据compact执行策略进行比较,是否需要进行Compact操作,如果需要操作,则使用BaseHoodieCompactionPlanGenerator对DeltaCommit信息进行分组生成HoodieCompactionPlan,然后将HoodieCompactionPlan进行Avro编码,然后序列化到文件系统。
private HoodieCompactionPlan scheduleCompaction() {
// 根据最近的增量commit信息和compact策略进行判断是否需要compact
boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
if (compactable) {
try {
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
// 生成compact plan,可进行avro编码,可以序列化
return planGenerator.generateCompactionPlan();
} catch (IOException e) {
... }
}
return new HoodieCompactionPlan();
}
6.2 Compact的执行与提交
compact的执行和提交过程的核心代码如下,
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
return dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
// 算子本身不执行,当checkpoint完成时,会去加载上一个算子持久化的compact plan,并进行下发,本算子必须设置为单并行度
new CompactionPlanOperator(conf))
.setParallelism(1)
// 按照fileId进行分区
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
// 根据接收到的compact plan进行merge操作,生成新的文件持久化到文件系统,由于篇幅限制,这一步不做详细解析,可以参考源码继续跟读
new CompactOperator(conf))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
// 接收到上游完成compact后下发的结果,进行commit操作
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1);
}
完整的compact过程如下图所示
7.最后
本文通过跟读源码的方式对Flink写入Hudi的核心流程进行了解析,相信只有通过对写入流程的充分认识后,才能充分理解Flink和Hudi的各方面特性,才能充分提升线上任务的性能,由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。
最后总结一下,本文主要解析了Flink将DataStream写入到Hudi表中的核心过程:
- HoodieRecord DataStream的生成过程,这个过程会进行索引的加载过程;
- HoodieRecord DataStream的写入与提交过程,Flink首先会根据索引进行分桶,找到数据对应的文件地址,然后将数据flush到磁盘,然后通过算子的coordinator根据checkpoint的完成进行commit操作,完整数据的写入过程;
- Hudi文件的compact过程,对于异步compact过程,Flink会在提交后去生成compact的执行plan,然后将plan序列化并持久化到Hudi表目录下,最后通过新的算子去读取Hudi表目录下的plan并执行compact,最后完成compact后进行commit操作,至此Flink完整的写入流程已经梳理完成了。
当然,本文由于篇幅有限,没有对Flink和Hudi架构和概念进行详细的介绍,同时对Flink写入Hudi的性能优化也没有涉及,后续会加上Flink写入Hudi的性能分析。