流数据湖平台Apache Paimon(三)Flink进阶使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流数据湖平台Apache Paimon(三)Flink进阶使用

2.9 进阶使用

2.9.1 写入性能

Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量:

增加检查点间隔,或者仅使用批处理模式。

增加写入缓冲区大小。

启用写缓冲区溢出。

如果您使用固定存储桶模式,请重新调整存储桶数量。

2.9.1.1 并行度

建议sink的并行度小于等于bucket的数量,最好相等。

选项 必需的 默认 类型 描述
sink.parallelism No (none) Integer 定义sink的并行度。默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。

2.9.1.2 Compaction

当Sorted Run数量较少时,Paimon writer 将在单独的线程中异步执行压缩,因此记录可以连续写入表中。然而,为了避免Sorted Runs的无限增长,当Sorted Run的数量达到阈值时,writer将不得不暂停写入。下表属性确定阈值。

选项 必需的 默认 类型 描述
num-sorted-run.stop-trigger No (none) Integer 触发停止写入的Sorted Runs次数,默认值为 ‘num-sorted-run.compaction-trigger’ + 1。

当 num-sorted-run.stop-trigger 变大时,写入停顿将变得不那么频繁,从而提高写入性能。但是,如果该值变得太大,则查询表时将需要更多内存和 CPU 时间。如果您担心内存 OOM,请配置sort-spill-threshold。它的值取决于你的内存大小。

2.9.1.3 优先考虑写入吞吐量

如果希望某种模式具有最大写入吞吐量,则可以缓慢而不是匆忙地进行Compaction。可以对表使用以下策略

num-sorted-run.stop-trigger = 2147483647

sort-spill-threshold = 10

此配置将在写入高峰期生成更多文件,并在写入低谷期逐渐合并到最佳读取性能。

2.9.1.4 触发Compaction的Sorted Run数

Paimon使用LSM树,支持大量更新。 LSM 在多次Sorted Runs中组织文件。从 LSM 树查询记录时,必须组合所有Sorted Runs以生成所有记录的完整视图。

过多的Sorted Run会导致查询性能不佳。为了将Sorted Run的数量保持在合理的范围内,Paimon writers 将自动执行Compaction。下表属性确定触发Compaction的最小Sorted Run数。

选项 必需的 默认 类型 描述
num-sorted-run.compaction-trigger No 5 Integer 触发Compaction的Sorted Run数。包括 0 级文件(一个文件一级排序运行)和高级运行(一个一级排序运行)。

2.9.1.5 写入初始化

在write初始化时,bucket的writer需要读取所有历史文件。如果这里出现瓶颈(例如同时写入大量分区),可以使用write-manifest-cache缓存读取的manifest数据,以加速初始化。

2.9.1.6 内存

Paimon writer中主要占用内存的地方有3个:

Writer的内存缓冲区,由单个任务的所有Writer共享和抢占。该内存值可以通过 write-buffer-size 表属性进行调整。

合并多个Sorted Run以进行Compaction时会消耗内存。可以通过 num-sorted-run.compaction-trigger 选项进行调整,以更改要合并的Sorted Run的数量。

如果行非常大,在进行Compaction时一次读取太多行数据可能会消耗大量内存。减少 read.batch-size 选项可以减轻这种情况的影响。

写入列式(ORC、Parquet等)文件所消耗的内存,不可调。

2.9.2 读取性能

2.9.2.1 Full Compaction

配置“full-compaction.delta-commits”在Flink写入中定期执行full-compaction。并且可以确保在写入结束之前分区被完全Compaction。

注意:Paimon 默认处理小文件并提供良好的读取性能。请不要在没有任何要求的情况下配置此Full Compaction选项,因为它会对性能产生重大影响。

2.9.2.2 主键表

对于主键表来说,这是一种“MergeOnRead”技术。读取数据时,会合并多层LSM数据,并行数会受到桶数的限制。虽然Paimon的merge会高效,但是还是赶不上普通的AppendOnly表。

如果你想在某些场景下查询得足够快,但只能找到较旧的数据,你可以:

配置full-compaction.delta-commits,写入数据时(目前只有Flink)会定期进行full Compaction。

配置“scan.mode”为“compacted-full”,读取数据时,选择full-compaction的快照。读取性能良好。

2.9.2.3 仅追加表

小文件会降低读取速度并影响 DFS 稳定性。默认情况下,当单个存储桶中的小文件超过“compaction.max.file-num”(默认50个)时,就会触发compaction。但是当有多个桶时,就会产生很多小文件。

您可以使用full-compaction来减少小文件。full-compaction将消除大多数小文件。

2.9.2.4 格式

Paimon 对 parquet 读取进行了一些查询优化,因此 parquet 会比 orc 稍快一些。

2.9.3 多Writer并发写入

Paimon的快照管理支持向多个writer写入。

默认情况下,Paimon支持对不同分区的并发写入。推荐的方式是streaming job将记录写入Paimon的最新分区;同时批处理作业(覆盖)将记录写入历史分区。

如果需要多个Writer写到同一个分区,事情就会变得有点复杂。例如,不想使用 UNION ALL,那就需要有多个流作业来写入“partial-update”表。参考如下的“Dedicated Compaction Job”。

2.9.3.1 Dedicated Compaction Job

默认情况下,Paimon writer 在写入记录时会根据需要执行Compaction。这对于大多数用例来说已经足够了,但有两个缺点:

这可能会导致写入吞吐量不稳定,因为执行压缩时吞吐量可能会暂时下降。

Compaction会将某些数据文件标记为“已删除”(并未真正删除)。如果多个writer标记同一个文件,则在提交更改时会发生冲突。 Paimon 会自动解决冲突,但这可能会导致作业重新启动。

为了避免这些缺点,用户还可以选择在writer中跳过Compaction,并仅运行专门的作业来进行Compaction。由于Compaction仅由专用作业执行,因此writer可以连续写入记录而无需暂停,并且不会发生冲突。

选项 必需的 默认 类型 描述
write-only No false Boolean 如果设置为 true,将跳过Compaction和快照过期。此选项与独立Compaction一起使用。

Flink SQL目前不支持compaction相关的语句,所以我们必须通过flink run来提交compaction作业。

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
compact \
–warehouse \
–database \
–table \
[–partition ] \
[–catalog-conf [–catalog-conf …]] \

如果提交一个批处理作业(execution.runtime-mode:batch),当前所有的表文件都会被Compaction。如果您提交一个流作业(execution.runtime-mode: Streaming),该作业将持续监视表的新更改并根据需要执行Compaction。

2.9.4 表管理

2.9.4.1 管理快照

1)快照过期

Paimon Writer每次提交都会生成一个或两个快照。每个快照可能会添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而,标记的数据文件并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。

目前,Paimon Writer在提交新更改时会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。

设置以下表属性:

选项 必需的 默认 类型 描述
snapshot.time-retained No 1 h Duration 已完成快照的最长时间保留。
snapshot.num-retained.min No 10 Integer 要保留的已完成快照的最小数量。
snapshot.num-retained.max No Integer.MAX_VALUE Integer 要保留的已完成快照的最大数量。

注意,保留时间太短或保留数量太少可能会导致如下问题:

批量查询找不到该文件。例如,表比较大,批量查询需要10分钟才能读取,但是10分钟前的快照过期了,此时批量查询会读取到已删除的快照。

表文件上的流式读取作业(没有外部日志系统)无法重新启动。当作业重新启动时,它记录的快照可能已过期。 (可以使用Consumer Id来保护快照过期的小保留时间内的流式读取)。

2)回滚快照

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
rollback-to \
–warehouse \
–database \
–table \
–snapshot \
[–catalog-conf [–catalog-conf …]]

2.9.4.2 管理分区

创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。

判断分区是否过期:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time。比如:

CREATE TABLE T (…) PARTITIONED BY (dt) WITH (
‘partition.expiration-time’ = ‘7 d’,
‘partition.expiration-check-interval’ = ‘1 d’,
‘partition.timestamp-formatter’ = ‘yyyyMMdd’
);
选项 默认 类型 描述
partition.expiration-check-interval 1 h Duration 分区过期的检查间隔。
partition.expiration-time (none) Duration 分区的过期时间间隔。如果分区的生命周期超过此值,则该分区将过期。分区时间是从分区值中提取的。
partition.timestamp-formatter (none) String 用于格式化字符串时间戳的格式化程序。它可以与“partition.timestamp-pattern”一起使用来创建使用指定值的格式化程序。> 默认格式化程序为“yyyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。> 支持多个分区字段,例如“y e a r − year-yearmonth-$day $hour:00:00”。> 时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。
partition.timestamp-pattern (none) String 可以指定一种模式来从分区获取时间戳。格式化程序模式由“partition.timestamp-formatter”定义。> 默认情况下,从第一个字段读取。> 如果分区中的时间戳是名为“dt”的单个字段,则可以使用“d t ”。 > 如果它分布在年、月、日和小时的多个字段中,则可以使用“ dt”。> 如果它分布在年、月、日和小时的多个字段中,则可以使用“dt>如果它分布在年、月、日和小时的多个字段中,则可以使用year-m o n t h − month-monthday h o u r : 00 : 00 ”。 > 如果时间戳位于 d t 和 h o u r 字段中,则可以使用“ hour:00:00”。> 如果时间戳位于 dt 和 hour 字段中,则可以使用“hour:00:00”>如果时间戳位于dthour字段中,则可以使用dt $hour:00:00”。

2.9.4.3 管理小文件

小文件可能会导致:

稳定性问题:HDFS中小文件过多,NameNode会承受过大的压力。

成本问题:HDFS中的小文件会暂时使用最小1个Block的大小,例如128MB。

查询效率:小文件过多查询效率会受到影响。

1)FlinkCheckpoint的影响

使用Flink Writer,每个checkpoint会生成 1-2 个快照,并且checkpoint会强制在 DFS 上生成文件,因此checkpoint间隔越小,会生成越多的小文件。

默认情况下,不仅checkpoint会导致文件生成,writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用 write-buffer-spillable 在 writer 中生成溢出文件,从而在 DFS 中生成更大的文件。

所以,可以设置如下:

增大checkpoint间隔

增加 write-buffer-size 或启用 write-buffer-spillable

2)快照的影响

Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在 Snapshot 过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer 会自动使快照过期。

分区和分桶的影响

表数据会被物理分片到不同的分区,里面有不同的桶,所以如果整体数据量太小,单个桶中至少有一个文件,建议你配置较少的桶数,否则会出现也有很多小文件。

3)主键表LSM的影响

LSM 树将文件组织成Sorted Runs的运行。Sorted Runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Runs。

默认情况下,Sorted Runs数取决于 num-sorted-run.compaction-trigger,这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。

4)仅追加表的文件的影响

默认情况下,Append-Only 还会进行自动Compaction以减少小文件的数量

对于分桶的 Append-only 表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。

5)Full-Compaction的影响

主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。

建议配置Full-Compaction,在Flink写入时配置‘full-compaction.delta-commits’定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。

2.9.5 缩放Bucket

1)说明

由于总桶数对性能影响很大,Paimon 允许用户通过 ALTER TABLE 命令调整桶数,并通过 INSERT OVERWRITE 重新组织数据布局,而无需重新创建表/分区。当执行覆盖作业时,框架会自动扫描旧桶号的数据,并根据当前桶号对记录进行哈希处理。

– rescale number of total buckets
ALTER TABLE table_identifier SET (‘bucket’ = ‘…’)
– reorganize data layout of table/partition
INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
SELECT …
FROM table_identifier
[WHERE part_spec]
注意:
ALTER TABLE 仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过INSERT OVERWRITE来实现。
重新缩放桶数不会影响读取和正在运行的写入作业。
一旦存储桶编号更改,任何新安排的 INSERT INTO 作业写入未重新组织的现有表/分区将抛出 TableException ,并显示如下类似异常:
Try to write table/partition … with a new bucket num …,
but the previous bucket num is … Please switch to batch mode,
and perform INSERT OVERWRITE to rescale current data layout first.
对于分区表,不同的分区可以有不同的桶号。例如:
ALTER TABLE my_table SET (‘bucket’ = ‘4’);
INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-01’)
SELECT * FROM …;
ALTER TABLE my_table SET (‘bucket’ = ‘8’);
INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-02’)
SELECT * FROM …;

在覆盖期间,确保没有其他作业写入同一表/分区。

注意:对于启用日志系统的表(例如Kafka),请重新调整主题的分区以保持一致性。

重新缩放存储桶有助于处理吞吐量的突然峰值。假设有一个每日流式ETL任务来同步交易数据。该表的DDL和管道如下所示。

2)官方示例:

如下是正在跑的一个作业:

– 建表
CREATE TABLE verified_orders (
trade_order_id BIGINT,
item_id BIGINT,
item_price DOUBLE,
dt STRING,
PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED
) PARTITIONED BY (dt)
WITH (
‘bucket’ = ‘16’
);
– kafka表
CREATE temporary TABLE raw_orders(
trade_order_id BIGINT,
item_id BIGINT,
item_price BIGINT,
gmt_create STRING,
order_status STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘…’,
‘properties.bootstrap.servers’ = ‘…’,
‘format’ = ‘csv’
);
– 流式插入16个分桶
INSERT INTO verified_orders
SELECT trade_order_id,
 item_id,
 item_price,
 DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt
FROM raw_orders
WHERE order_status = ‘verified’;

过去几周运行良好。然而,最近数据量增长很快,作业的延迟不断增加。为了提高数据新鲜度,用户可以执行如下操作缩放分桶:

(1)使用保存点暂停流作业

$ ./bin/flink stop \
–savepointPath /tmp/flink-savepoints \
$JOB_ID

(2)增加桶数

ALTER TABLE verified_orders SET (‘bucket’ = ‘32’);

(3)切换到批处理模式并覆盖流作业正在写入的当前分区

SET ‘execution.runtime-mode’ = ‘batch’;

– 假设今天是2022-06-22

– 情况1:没有更新历史分区的延迟事件,因此覆盖今天的分区就足够了

INSERT OVERWRITE verified_orders PARTITION (dt = ‘2022-06-22’)
SELECT trade_order_id,
 item_id,
 item_price
FROM verified_orders
WHERE dt = ‘2022-06-22’;

- 情况2:有更新历史分区的延迟事件,但范围不超过3天

INSERT OVERWRITE verified_orders
SELECT trade_order_id,
 item_id,
 item_price,
 dt
FROM verified_orders
WHERE dt IN (‘2022-06-20’, ‘2022-06-21’, ‘2022-06-22’);

(4)覆盖作业完成后,切换回流模式,从保存点恢复(可以增加并行度=新bucket数量)。

SET ‘execution.runtime-mode’ = ‘streaming’;
SET ‘execution.savepoint.path’ = ;
INSERT INTO verified_orders
SELECT trade_order_id,
item_id,
item_price,
DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt
FROM raw_orders
WHERE order_status = ‘verified’;

2.10 文件操作理解

2.10.1 插入数据

当我们执行INSERT INTO
CREATE CATALOG paimon WITH (
‘type’ = ‘paimon’,
‘warehouse’ = ‘file:///tmp/paimon’
);
USE CATALOG paimon;
CREATE TABLE T (
id BIGINT,
a INT,
b STRING,
dt STRING COMMENT ‘timestamp string in format yyyyMMdd’,
PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
INSERT INTO T VALUES (1, 10001, ‘varchar00001’, ‘20230501’);

一旦Flink作业完成,记录就会通过成功提交写入Paimon表中。用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回单行。提交过程创建位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。 snapshot-1 处生成的文件布局如下所述:

snapshot-1 的内容包含快照的元数据,例如清单列表(manifest list)和schema ID:

{
“version” : 3,
“id” : 1,
“schemaId” : 0,
“baseManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0”,
“deltaManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1”,
“changelogManifestList” : null,
“commitUser” : “7d758485-981d-4b1a-a0c6-d34c3eb254bf”,
“commitIdentifier” : 9223372036854775807,
“commitKind” : “APPEND”,
“timeMillis” : 1684155393354,
“logOffsets” : { },
“totalRecordCount” : 1,
“deltaRecordCount” : 1,
“changelogRecordCount” : 0,
“watermark” : -9223372036854775808
}
清单列表包含快照的所有更改,baseManifestList 是应用 deltaManifestList 中的更改的基础文件。第一次提交将生成 1 个清单文件(manifest file),并创建 2 个清单列表(manifest list):
./T/manifest:
–deltaManifestList:包含对数据文件执行操作的清单条目列表(上图中的 manifest-list-1-delta)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1
–baseManifestList:空的(上图中的 manifest-list-1-base)
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0
–清单文件:存储快照中数据文件的信息(上图中的manifest-1-0)
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0

跨不同分区插入一批记录:

INSERT INTO T VALUES
(2, 10002, ‘varchar00002’, ‘20230502’),
(3, 10003, ‘varchar00003’, ‘20230503’),
(4, 10004, ‘varchar00004’, ‘20230504’),
(5, 10005, ‘varchar00005’, ‘20230505’),
(6, 10006, ‘varchar00006’, ‘20230506’),
(7, 10007, ‘varchar00007’, ‘20230507’),
(8, 10008, ‘varchar00008’, ‘20230508’),
(9, 10009, ‘varchar00009’, ‘20230509’),
(10, 10010, ‘varchar00010’, ‘20230510’);

第二次提交发生,执行 SELECT * FROM T 将返回 10 行。创建一个新快照,即 snapshot-2,并为我们提供以下物理文件布局:

% ls -atR .
./T:
dt=20230501
dt=20230502
dt=20230503
dt=20230504
dt=20230505
dt=20230506
dt=20230507
dt=20230508
dt=20230509
dt=20230510
snapshot
schema
manifest
./T/snapshot:
LATEST
snapshot-2
EARLIEST
snapshot-1
./T/manifest:
manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2
manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2
manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1
manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1
manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1
./T/dt=20230501/bucket-0:
data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc
# each partition has the data written to bucket-0
./T/schema:
schema-0

截至 snapshot-2 的新文件布局如下所示:

2.10.2 删除数据

执行如下删除:

DELETE FROM T WHERE dt >= ‘20230503’;

第三次提交发生,它为我们提供了 snapshot-3。现在,列出表下的文件,您会发现没有分区被删除。相反,会为分区 20230503 到 20230510 创建一个新的数据文件:

./T/dt=20230510/bucket-0:

data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement

data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement

因为我们在第二次提交中插入一条记录(由 +I[10, 10010, ‘varchar00010’, ‘20230510’] 表示),然后在第三次提交中删除该记录。执行 SELECT * FROM T 将返回 2 行,即:

+I[1, 10001, ‘varchar00001’, ‘20230501’]

+I[2, 10002, ‘varchar00002’, ‘20230502’]

截至 snapshot-3 的新文件布局如下所示

manifest-3-0包含8个ADD操作类型的manifest条目,对应8个新写入的数据文件。

2.10.3 Compaction

小文件的数量会随着连续快照的增加而增加,这可能会导致读取性能下降。因此,需要进行full compaction以减少小文件的数量。

现在触发full-compaction:

./bin/flink run \
./lib/paimon-flink-action-0.5-SNAPSHOT.jar \
compact \
–path file:///tmp/paimon/default.db/T
所有当前表文件将被压缩,并创建一个新快照,即 snapshot-4,并包含以下信息:
{
“version” : 3,
“id” : 4,
“schemaId” : 0,
“baseManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0”,
“deltaManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1”,
“changelogManifestList” : null,
“commitUser” : “a3d951d5-aa0e-4071-a5d4-4c72a4233d48”,
“commitIdentifier” : 9223372036854775807,
“commitKind” : “COMPACT”,
“timeMillis” : 1684163217960,
“logOffsets” : { },
“totalRecordCount” : 38,
“deltaRecordCount” : 20,
“changelogRecordCount” : 0,
“watermark” : -9223372036854775808
}

截至 snapshot-4 的新文件布局如下所示

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

对于分区20230503到20230510,对两个数据文件进行两次DELETE操作

对于分区20230501到20230502,对同一个数据文件进行1次DELETE操作和1次ADD操作。

2.10.4 修改表

执行以下语句来配置full-compaction:

ALTER TABLE T SET (‘full-compaction.delta-commits’ = ‘1’);

它将为 Paimon 表创建一个新schema,即 schema-1,但在下一次提交之前还没有快照实际使用该schema。

2.10.5 过期快照

在快照过期的过程中,首先确定快照的范围,然后将这些快照内的数据文件标记为删除。仅当存在引用特定数据文件的类型为 DELETE 的清单条目时,数据文件才会被标记为删除。此标记可确保该文件不会被后续快照使用并可以安全删除。

假设上图中的所有 4 个快照都即将过期。过期流程如下:

它首先删除所有标记的数据文件,并记录任何更改的存储桶。

然后它会删除所有更改日志文件和关联的清单。

最后,它删除快照本身并写入最早的提示文件。

如果删除过程后有任何目录留空,它们也将被删除。

假设创建了另一个快照 snapshot-5 并触发了快照过期。 snapshot-1 到 snapshot-4 被删除。为简单起见,我们将只关注以前快照中的文件,快照过期后的最终布局如下所示:

结果,分区20230503至20230510被物理删除。

2.10.6 Flink 流式写入

用 CDC 摄取的示例来说明 Flink Stream Write。本节将讨论更改数据的捕获和写入 Paimon,以及异步Compaction和快照提交和过期背后的机制。

CDC 摄取工作流程以及所涉及的每个组件所扮演的独特角色:

(1)MySQL CDC Source统一读取快照和增量数据,分别由SnapshotReader读取快照数据和BinlogReader读取增量数据。

(2)Paimon Sink将数据写入桶级别的Paimon表中。其中的CompactManager将异步触发Compaction。

(3)Committer Operator 是一个单例,负责提交和过期快照。

端到端数据流:

MySQL Cdc Source读取快照和增量数据,并在规范化后将它们发送到下游:

Paimon Sink 首先将新记录缓冲在基于堆的 LSM 树中,并在内存缓冲区满时将它们刷新到磁盘。请注意,写入的每个数据文件都是Sorted Run。此时,还没有创建清单文件和快照。在 Flink 检查点发生之前,Paimon Sink 将刷新所有缓冲记录并向下游发送可提交消息,该消息在检查点期间由 Committer Operator 读取并提交:

在检查点期间,Committer Operator 将创建一个新快照并将其与清单列表关联起来,以便该快照包含有关表中所有数据文件的信息:

稍后可能会发生异步Compaction,CompactManager 生成的提交表包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构造相应的清单条目。在这种情况下,Committer Operator 可能会在 Flink 检查点期间生成两个快照:

一个用于写入数据(Append 类型的快照),

另一个用于compact(Compact 类型的快照)。

如果在检查点间隔期间没有写入数据文件,则只会创建 Compact 类型的快照。 Committer Operator 将检查快照是否过期并执行标记数据文件的物理删除。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3天前
|
消息中间件 关系型数据库 MySQL
Apache Flink CDC 3.1.0 发布公告
Apache Flink 社区很高兴地宣布发布 Flink CDC 3.1.0!
201 0
Apache Flink CDC 3.1.0 发布公告
|
3天前
|
Java 关系型数据库 数据库连接
实时计算 Flink版操作报错之遇到错误org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc',该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3天前
|
SQL 关系型数据库 Java
实时计算 Flink版操作报错之在阿里云DataHub平台上执行SQL查询GitHub新增star仓库Top 3时不显示结果,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
69 0
|
8天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
72 5
|
8天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
94 1
|
8天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
40 3
|
11天前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
11天前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
364 1
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案

推荐镜像

更多