- 主键表
- 仅追加表
- 快照管理
- 文件布局
- Java API
- Flink API
- 读性能
- 写性能
主键表(Primary Key Table)
Merge Engines
sink-upsert-realize可能会导致不正常的现象。当输入乱序时,我们建议您使用序列字段来纠正无序。建议设置为None。
set table.exec.sink.upsert-materialize = NONE
Deduplicate
如果用户建表时不指定 merge-engine 配置,创建的 PK 表默认的 Merge Engine 是 deduplicate 即只保留最新的记录,其他的同 PK 数据则被丢弃,如果最新的记录是 DELETE 记录,那么相同 PK 的所有数据都将被删除。
'merge-engine' = 'Deduplicate'
Partial Update
partial-update 必须跟 lookup 或者 full-compaction changelog producer结合使用。Partial无法接收DELETE消息,可以将partial-update.ignore-delete配置为忽略delete消息。
如果用户建表时指定'merge-engine' = 'partial-update',那么就会使用部分更新表引擎,可以做到多个 Flink 流任务去更新同一张表,每条流任务只更新一张表的部分列,最终实现一行完整的数据的更新,对于需要拉宽表的业务场景,partial-update 非常适合此场景,而且构建宽表的操作也相对简单。这里所说的多个 Flink 流任务并不是指多个 Flink Job 并发写同一张 Paimon 表,这样需要拆分 Compaction 任务,就不能在每个 Job 的 Writer 端做 Compaction, 需要一个独立的 Compaction 任务,比较麻烦。目前推荐将多条 Flink 流任务 UNION ALL 起来,启动一个 Job 写 Paimon 表。这里需要注意的是,对于流读场景,partial-update 表引擎需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,同时 partial-update 不能接收和处理 DELETE 消息,为了避免接收到 DELETE 消息报错,需要通过配置 'partial-update.ignore-delete' = 'true' 忽略 DELETE 消息。
--创建Partial update结果表 CREATE TABLE if not EXISTS paimon.dw.order_detail ( `order_id` string ,`product_type` string ,`plat_name` string ,`ref_id` bigint ,`start_city_name` string ,`end_city_name` string ,`create_time` timestamp(3) ,`update_time` timestamp(3) ,`dispatch_time` timestamp(3) ,`decision_time` timestamp(3) ,`finish_time` timestamp(3) ,`order_status` int ,`binlog_time` bigint ,PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'bucket' = '20', -- 指定20个bucket 'bucket-key' = 'order_id', 'sequence.field' = 'binlog_time', -- 记录排序字段 'changelog-producer' = 'full-compaction', -- 选择 full-compaction ,在compaction后产生完整的changelog 'changelog-producer.compaction-interval' = '2 min', -- compaction 间隔时间 'merge-engine' = 'partial-update', 'partial-update.ignore-delete' = 'true' -- 忽略DELETE数据,避免运行报错 );
Aggregation
如果用户建表时指定 'merge-engine' = 'aggregation',此时使用聚合表引擎,可以通过聚合函数做一些预聚合,每个除主键以外的列都可以指定一个聚合函数,相同主键的数据就可以按照列字段指定的聚合函数进行相应的预聚合,如果不指定则默认为 last-non-null-value ,空值不会覆盖。Agg 表引擎也需要结合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,需要注意的是除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配置 'fields.${field_name}.ignore-retract'='true' 忽略。
CREATE TABLE MyTable ( product_id BIGINT, price DOUBLE, sales BIGINT, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'fields.price.aggregate-function' = 'max', 'fields.sales.aggregate-function' = 'sum' );
change producer
Changelog 主要应用在流读场景。流式查询将不断产生最新的更改。这些更改可以来自底层表文件,也可以来自像Kafka这样的外部日志系统。与外部日志系统相比,表文件的更改成本较低,但延迟较高(取决于创建快照的频率)。
通过在创建表时指定变更日志生产者表属性,用户可以选择从文件中生成的变更模式。
目前数仓分层是在 Paimon 里做的,数据以 Table Format 的形式存储在文件系统上,如果下游的 Flink 任务要流读 Paimon 表数据,需要存储帮助生成 Changelog(成本较低,但延迟相对较高),以便下游流读的,这时就需要我们在建表时指定 Paimon 的 Changelog Producer 决定以何种方式在何时生成 Changelog。如果不指定则不会在写入 Paimon 表的时候生成 Changelog,那么下游任务需要在流读时生成一个物化节点来产生 Changelog。这种方式的成本相对较高,同时官方不建议这样使用,因为下游任务在 State 中存储一份全量的数据,即每条数据以及其变更记录都需要保存在状态中。
Paimon 支持的 Changelog Produer 包括:
none:如果不指定,默认就是 none,成本较高,不建议使用。
总之,'changelog-producer' = 'none' 最适合数据库系统这样的消费者。Flink还有一个内置的“normalize”操作符,它将每个键的值保持在状态中。可以很容易地看出,这种操作成本非常高,应该避免。
input:如果我们的 Source 源是业务库的 Binlog ,即写入 Paimon 表 Writer 任务的输入是完整的 Changelog,此时能够完全依赖输入端的 Changelog, 并且将输入端的 Changelog 保存到 Paimon 的 Changelog 文件,由 Paimon Source 提供给下游流读。通过配置 'changelog-producer' = 'input',将 Changelog Producer 设置为 input 。
通过指定'changelog-producer' = 'input',Paimon编写器将其输入作为完整变更日志的来源。所有输入记录将保存在单独的变更日志文件中,并由Paimon来源提供给消费者。最适合cdc或者flink有状态计算场景。以下为一个flink cdc如湖的demo
EXECUTE CDCSOURCE cdc_demo WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'username', 'password' = 'password', 'checkpoint' = '30000', 'scan.startup.mode' = 'initial', 'source.server-time-zone' = 'Asia/Tokyo', 'parallelism' = '4', 'database-name' = 'demo', 'sink.connector' = 'sql-catalog', 'sink.catalog.name' = 'fts_hive', 'sink.catalog.type' = 'fts_hive', 'sink.catalog.uri' = 'thrift://localhost:9083', 'sink.bucket' = '4', 'sink.snapshot.time-retained' = '24h', 'table-list' = 'A01,A02,A03,A04,A05', 'sink.changelog-producer' = 'input', 'sink.catalog.warehouse' = 'hdfs://cluster/warehouse/table_store', 'sink.sink.db' = 'fts_ods_db_demo' );
lookup:如果我们的输入不是完整的 Changelog, 并且不想在下游流读时通过 Normalize 节点生成 Changelog, 通过配置 'changelog-producer' = 'lookup',通过 Lookup 的方式在数据写入的时候生成 Changelog,此 Changelog Produer 目前处于实验状态,暂未经过大量的生产验证。
Paimon将在提交数据写入之前通过'lookup'生成变更日志。
full-compaction:除了以上几种方式,通过配置 'changelog-producer' = 'full-compaction' 将 Changelog Producer 设置为 full-compaction,Writer 端在 Compaction 后产生完整的 Changelog,并且写入到 Changelog 文件。通过设置 changelog-producer.compaction-interval 配置项控制 Compaction 的间隔和频率,不过此参数计划弃用,建议使用 full-compaction.delta-commits,此配置下默认为1 即每次提交都做 Compaction。
如果您认为'lookup'的资源消耗太大,可以考虑使用'full-compaction' changelog producer,它可以将数据写入和变更日志生成解耦,更适合高延迟的场景(例如,10分钟)。通过指定'changelog producer'='full-compaction',Paimon将比较完全压缩之间的结果,并将差异作为changelog生成。变更日志的延迟受完全压缩频率的影响。通过指定full-compaction.delta-commits表属性,完全压缩将在delta提交(检查点)后不断触发。默认情况下,它设置为1,因此每个检查点都将有一个完整的压缩并生成一个更改日志。
--CREATE TABLE create table t_dwd_table( ...... id string, gn string, dt string, PRIMARY KEY (gn, id, log_create_unix_time, dt) NOT ENFORCED ) partitioned by (gn, dt) WITH ( 'bucket' = '8', 'bucket-key' = 'id', 'changelog-producer' = 'full-compaction', 'changelog-producer.compaction-interval' = '54s', 'snapshot.time-retained' = '24h' );
仅追加表(Append Only Table)
Bucketing
未分区的表或分区表中的分区被细分为桶,为数据提供额外的结构,可用于更高效的查询。
bucket的范围由记录中一个或多个列的哈希值确定。用户可以通过提供bucket键选项来指定bucketing列。如果未指定bucket key选项,则主键(如果已定义)或完整记录将用作bucket key。
存储桶是最小的读写存储单元,因此存储桶的数量限制了最大的处理并行性。不过,这个数字不应该太大,因为这会导致大量小文件和低读取性能。通常,每个bucket中的推荐数据大小约为1GB。可以为仅追加表定义bucket编号.建议设置bucket key字段。否则,数据将按照整行进行哈希,性能会很差。
建表时配置 'write-mode' = 'append-only',用户可以创建 Append Only 表。Append Only 表采用追加写的方式,只能插入一条完整的记录,不能更新和删除,也无需定义主键。Append Only 表主要用于无需更新的场景,例如 ODS 层数据将 Kafka 埋点日志数据解析后写入到 Paimon 表,保留原貌不做任何更新,此时推荐采用 Paimon 的 Append Only 表。 需要注意的是由于 Append Only 表没有主键,用户必须指定 bucket-key,否则采用整行数据做 Hash 效率偏低。
当我们采用 Paimon 来构建 AppendOnly 表时,数据不仅可以实时写入,还可以实时读取,读写顺序一致,而且实时资源消耗也降低了不少完全可以替换部分消息队列的场景,达到解耦和降本增效的效果。SQL 如下:
CREATE TABLE if not exists paimon.ods.event_log( ....... ) PARTITIONED BY (......) WITH ( 'bucket' = '100', 'bucket-key' = 'uuid', 'snapshot.time-retained' = '7 d', 'write-mode' = 'append-only' ); INSERT INTO paimon.ods.event_log SELECT ....... FROM realtime_event_kafka_source ;
Compaction
Streaming Source
streaming source目前仅在支持flink引擎。
流读顺序
对于来自两个不同分区的任意两条记录 如果scan.plan-sort-partition设置为true,则将首先生成分区值较小的记录。 否则,将首先生成分区创建时间较早的记录。 对于来自同一分区和同一存储桶的任意两条记录,将首先生成第一条写入的记录。 对于来自同一分区但有两个不同存储桶的任意两条记录,不同的存储桶由不同的任务处理,它们之间没有顺序保证。
watermark定义
CREATE TABLE T ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH (...); -- launch a bounded streaming job to read paimon_table SELECT window_start, window_end, COUNT(`user`) FROM TABLE( TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
有界流
Streaming Source也可以是有界的,可以指定“scan.bounded.watermark”来定义有界流模式的结束条件,流读取将结束,直到遇到更大的水印快照。
快照中的水印是由作者生成的,例如,您可以指定一个kafka源并声明水印的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的水印,这样您就可以在流式读取此Paimon表时使用有界水印的功能。
CREATE TABLE kafka_table ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ('connector' = 'kafka'...); -- launch a streaming insert job INSERT INTO paimon_table SELECT * FROM kakfa_table; -- launch a bounded streaming job to read paimon_table SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
快照管理
快照过期
Paimon写生成器每次提交都会生成一到两个快照。每个快照可以添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而,标记的数据文件并没有被真正删除, 因为Paimon还支持时间旅行到更早的快照。只有在快照过期时才会删除它们。
目前,Paimon编写器在提交新更改时会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件以释放磁盘空间。
快照过期由以下表属性控制。请注意,保留时间过短或保留数量过少可能导致:
批处理查询找不到该文件。例如,表相对较大,批处理查询需要10分钟才能读取,但10分钟前的快照过期,此时批处理查询将读取已删除的快照。 无法重新启动表文件上的流式读取作业(没有外部日志系统)。当作业重新启动时,它记录的快照可能已过期。(您可以使用Consumer Id在快照过期的一小段保留时间内保护流式读取)。
快照回滚
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ rollback-to \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ --snapshot <snapshot-id>
文件布局
## snapshot files 所有快照文件都存储在快照目录中。 快照文件是一个JSON文件,包含有关此快照的信息,包括正在使用的schema文件、包含此快照的所有更改的清单列表 ## manifest lists 所有清单列表和清单文件都存储在清单目录中。 清单列表是清单文件名的列表。 清单文件是包含有关LSM数据文件和变更日志文件的更改的文件。例如,在相应的快照中创建了哪个LSM数据文件,删除了哪个文件。 ## Data files 数据文件按分区和存储桶分组。每个bucket目录都包含一个LSM树及其变更日志文件。 目前,Paimon支持使用orc(默认)、parquet和avro作为数据文件的格式。 ## LSM Soreted-Runs LSM树将文件组织为几个排序的运行。排序运行由一个或多个数据文件组成,每个数据文件恰好属于一个排序运行。 数据文件中的记录按其主键进行排序。在排序运行中,数据文件的主键范围永远不会重叠。 压缩 当越来越多的记录被写入LSM树时,排序的运行次数将增加。因为查询LSM树需要组合所有排序的运行,所以过多的排序运行将导致查询性能不佳,甚至内存不足。 为了限制排序运行的数量,我们必须偶尔将几个排序运行合并为一个大的排序运行。此过程称为压实。 然而,压缩是一个资源密集型过程,需要消耗一定的CPU时间和磁盘IO,因此过于频繁的压缩可能会导致写入速度变慢。这是查询性能和写入性能之间的权衡。Paimon目前采用的压缩策略类似于Rocksdb的通用压缩。 默认情况下,当Paimon编写器将记录附加到LSM树时,他们还会根据需要执行压缩。用户还可以选择在专用压缩作业中执行所有压缩。有关更多信息,请参阅专用压实作业。
如图所示:不同Sorted Runs可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须组合所有排序的运行,并且必须根据用户指定的合并引擎和每条记录的时间戳合并具有相同主键的所有记录。写入LSM树的新记录将首先缓冲在内存中。当内存缓冲区已满时,内存中的所有记录都将被排序并刷新到磁盘。现在将创建一个新的排序运行。
插入数据
INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
snapshot-1的文件内容记录了快照的元信息,比如 manifest list 和 schema id(具体的文件名是随机的,每次实验可能不同):
{ "version" : 3, "id" : 1, "schemaId" : 0, "baseManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0", "deltaManifestList" : "manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1", "changelogManifestList" : null, "commitUser" : "7d758485-981d-4b1a-a0c6-d34c3eb254bf", "commitIdentifier" : 9223372036854775807, "commitKind" : "APPEND", "timeMillis" : 1684155393354, "logOffsets" : { }, "totalRecordCount" : 1, "deltaRecordCount" : 1, "changelogRecordCount" : 0, "watermark" : -9223372036854775808 }
manifest list包含快照的所有更改,记录的方式为 基线(baseManifestList)+ 变更(deltaManifestList),每个 manifest list 又包含了一个或多个 manifest 记录来描述具体对数据文件的操作。baseManifestList是应用deltaManifest列表中的更改的基础文件。第一次提交将产生1个清单文件,并创建2个清单列表(文件名可能与实验中的文件名不同):
./T/manifest: manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0是 快照-1 的基线(baseManifestList),对应 图-1 中的manifest-list-1-base。因为是首次写入,基线中没有任何内容。 manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 是 快照-1 的变更 (deltaManifestList),对应 图-1 中的manifest-list-1-delta。它包含一个 manifest 记录: manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0是 快照1 对数据文件的操作记录,对应 图-1 中的 manifest-1-0。在本例中,它记录了对数据文件的添加操作,体现在 图-1 中指向数据文件的 ADD 箭头。
接下来我们尝试一次添加一批记录,对应多个分区。在 Flink SQL 中执行以下语句:
INSERT INTO T VALUES (2, 10002, 'varchar00002', '20230502'), (3, 10003, 'varchar00003', '20230503'), (4, 10004, 'varchar00004', '20230504'), (5, 10005, 'varchar00005', '20230505'), (6, 10006, 'varchar00006', '20230506'), (7, 10007, 'varchar00007', '20230507'), (8, 10008, 'varchar00008', '20230508'), (9, 10009, 'varchar00009', '20230509'), (10, 10010, 'varchar00010', '20230510');
Flink 任务执行完成后,第二次提交成功。读者可以通过 SELECT * FROM T;验证表内共有 10 条记录。该次提交创建来一个新的快照,snapshot-2。
我们进入表目录递归查看所有文件:
% ls -atR . ./T: dt=20230501 dt=20230502 dt=20230503 dt=20230504 dt=20230505 dt=20230506 dt=20230507 dt=20230508 dt=20230509 dt=20230510 snapshot schema manifest ./T/snapshot: LATEST snapshot-2 EARLIEST snapshot-1 ./T/manifest: manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # 快照-2 变更 manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # 快照-2 基线 manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # 快照-2 manifest 记录 manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # 快照-1 变更 manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # 快照-1 基线 manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # 快照-1 manifest 记录 ./T/dt=20230501/bucket-0: data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc ... # 20230502 - 20230510 每个分区均包含一个数据文件 ... ./T/schema: schema-0
对应的文件布局发生如下改变:
删除数据
DELETE FROM T WHERE dt >= '20230503';
执行完成后,第三次提交成功并且创建了对应的快照,snapshot-3。进入 Paimon 目录(/tmp/paimon/default.db/查看发现分区并没有被删除;相反地,20230503至 20230510每个分区下面都多了一个数据文件:
./T/dt=20230510/bucket-0: data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # delete语句创建的较新数据文件 data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # insert语句创建的旧数据文件
文件的更新和删除是比较重的操作,虽然逻辑上 20230510 这个分区已经没有数据了,但是为了提高写入性能,Paimon 不会立即对该分区内记录执行合并(Compact),而是新增一条对应的删除操作记录:
首次写入时,该分区内新增记录:+I[10, 10010, 'varchar00010', '20230510'] 后续删除时,该分区内新增记录:-D[10, 10010, 'varchar00010', '20230510']
在查询的时候,Paimon 会根据记录的写入顺序计算数据被更新/删除后的结果,所以执行删除操作后,SELECT * FROM T; 会返回以下 2 条记录,没毛病。
+I[1, 10001, 'varchar00001', '20230501'] +I[2, 10002, 'varchar00002', '20230502']
文件布局变化:需要注意的是, manfiest 记录(manifest-3-0)中包含 8 个 ADD 操作,对应在分区 20230503至 20230510创建的 8 个 数据文件(data-delete-0)。
表压缩(合并)
不管逻辑上是新增/更新/删除记录,每次执行 SQL 语句都会导致文件数量的增加。paimon表的小文件的数量会随着连续快照的增加而增加,这可能会导致读取性能下降。因此,为了减少小文件的数量,需要进行完全压缩。
让我们现在触发完全压缩。确保已将执行模式设置为批处理(在flink-conf.yaml中添加一个键值对execution.runtime-mode:batch),并通过flink-run运行一个全量合并作业:
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ compact \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ ## --path file:///tmp/paimon/default.db/T [--partition <partition-name>] \ [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
执行成功后,Paimon 表中所有的数据文件会被合并,同时产生一个新的快照,snapshot-4,快照元信息如下:
{ "version" : 3, "id" : 4, "schemaId" : 0, "baseManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0", "deltaManifestList" : "manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1", "changelogManifestList" : null, "commitUser" : "a3d951d5-aa0e-4071-a5d4-4c72a4233d48", "commitIdentifier" : 9223372036854775807, "commitKind" : "COMPACT", "timeMillis" : 1684163217960, "logOffsets" : { }, "totalRecordCount" : 38, "deltaRecordCount" : 20, "changelogRecordCount" : 0, "watermark" : -9223372036854775808 }
和之前执行写入操作的快照信息有所不同,commitKind字段是 COMPACT表示这是一个合并产生的快照。
最新的文件布局如下:在 manifest-4-0记录中,出现了对分区数据文件的删除记录。虽然这是个好兆头,意味着表的数据文件不会一直递增下去,但是数据文件并不会被物理删除,而是要等到分区过期(snapshot expire)时通过判断后才会真正被物理上抹去。
我们数一下 manifest-4-0记录包含的操作数量,共有 20 个操作记录,其中有 18 个 DELETE 操作和 2 个 ADD 操作:
分区 20230503 至 20230510,每个分区有 2 个 DELETE 操作,对应 2 个数据文件的删除。 对于分区20230503到20230510,每个分区对两个数据文件执行两次DELETE操作 对于分区20230501到20230502,每个分区对同一数据文件执行一个DELETE操作和一个ADD操作。 需要注意的是,图中没有表示对分区 20230501 至 20230502的文件操作,因为它们不会改动文件内容,只会改动数据文件的 level。感兴趣的读者可以进一步了解 LSM树 中的 level和sorted run的概念。
过期快照
在 manifest 中被标记删除的记录不会立即被物理删除,而是要等到快照过期阶段判断可以安全删除才能和该快照一起被清理。读者可以在 快照过期[4] 文章中了解更多内容。
现在假设在创建 快照-5 的时候执行快照过期,判断需要清理 快照-1 到 快照-4。过程如下:
对于所有被标记删除的数据文件,执行物理删除,记录删除的数据文件对应的分区和 bucket 删除该快照下的所有 changelog 和符合条件的 manifest 文件 删除快照文件,并更新最早的快照(snapshot目录下的EARLIEST记录)
可以看到,分区 20230503 到 20230510被物理删除。
Flink 流式写入
现在,我们以生产中常用的CDC导入为例,即通过 Flink CDC 流批一体读取 MySQL 全量和增量记录写入 Paimon,来串联以上提到的一系列文件操作。本节内容包括源端 CDC 数据的读取,Paimon 数据的写入和提交,异步小文件合并,和快照过期。
MySQL CDC Source流批一体读取 MySQL 全量和增量记录,其中 SnapshotReader负责读取全量记录,BinlogReader负责读取增量记录。 Paimon Sink负责写入数据到 Paimon 表,内部包括CompactManager负责异步触发 bucket 级别的小文件合并。 Committer Operator 是一个组件,负责最终提交写入使数据可见,以及快照过期清理。
接下来,我们以端到端的视角说明数据是如何写入到文件的。MySQL Cdc Source读取快照和增量数据,并在规范化后将其发送到下游。Paimon Sink首先缓存 CDC 记录到内存中的 LSM 树, 在缓存满或者 Flink 快照时flush 数据到磁盘(比如 OSS、HDFS、或者本地文件系统)。请注意,写入的每个数据文件都是经过排序的运行 此时,没有 manifest 文件和快照被创建。在 Flink checkpoint 触发之前,Paimon Sink 会刷新所有缓冲的记录并发送 committable 信息到下游的 Committer Operator,由Committer Operator负责后续的读取和提交。Flink checkpoint过程中,Committer Operator获取上游发送的 committable 信息,创建对应的快照和 manifest list。最终 snapshot 包含该时刻表内所有数据文件的信息。后续可能会进行异步压缩,如果CompactManager判断某个 bucket 需要进行小文件合并,会在 Paimon Sink异步执行小文件合并,并且将合并结果以 committable 信息发往下游。Committer Operator收到信息后,解析出该次合并更改的文件并创建Compact 类型的快照和对应的 manifest 文件。在这种情况下,Committer Operator在一次 Flink checkpoint 中会产生两个快照, 一个是数据写入对应的 Append类型快照,另外一个是小文件合并对应的 Compact类型快照。如果在 checkpoint 间隔窗口内,没有数据文件被写入 Paimon 表, 那么可以通过配置决定是否生成 Append类型快照。Committer Operator也负责快照过期,物理删除被标记的数据文件。
Java API
pom.xml
<dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-bundle</artifactId> <version>0.5-SNAPSHOT</version> </dependency>
Create Catalog
import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; public class CreateCatalog { public static void createFilesystemCatalog() { CatalogContext context = CatalogContext.create(new Path("...")); Catalog catalog = CatalogFactory.createCatalog(context); } public static void createHiveCatalog() { // Paimon Hive catalog relies on Hive jars // You should add hive classpath or hive bundled jar. Options options = new Options(); options.set("warehouse", "..."); options.set("metastore", "hive"); options.set("uri", "..."); options.set("hive-conf-dir", "..."); CatalogContext context = CatalogContext.create(options); Catalog catalog = CatalogFactory.createCatalog(context); } }
RenameTable
import org.apache.paimon.fs.Path; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; public class RenameTable { public static void main(String[] args) { Identifier fromTableIdentifier = Identifier.create("my_db", "my_table"); Identifier toTableIdentifier = Identifier.create("my_db", "test_table"); try { catalog.renameTable(fromTableIdentifier, toTableIdentifier, false); } catch (Catalog.TableAlreadyExistException e) { // do something } catch (Catalog.TableNotExistException e) { // do something } } }
AlterTable
Add列不能指定NOT NULL。 无法更新表中的分区列类型。 无法更改主键的可为Null性。 如果列的类型是嵌套行类型,则不支持更新列类型。 不支持将列更新为嵌套行类型。
import org.apache.paimon.fs.Path; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import com.google.common.collect.Lists; import java.util.Arrays; public class AlterTable { public static void main(String[] args) { Identifier identifier = Identifier.create("my_db", "my_table"); Map<String,String> options = new HashMap<>(); options.put("bucket", "4"); options.put("compaction.max.file-num", "40"); catalog.createTable( identifier, new Schema( Lists.newArrayList( new DataField(0, "col1", DataTypes.STRING(), "field1"), new DataField(1, "col2", DataTypes.STRING(), "field2"), new DataField(2, "col3", DataTypes.STRING(), "field3"), new DataField(3, "col4", DataTypes.BIGINT(), "field4"), new DataField( 4, "col5", DataTypes.ROW( new DataField(5, "f1", DataTypes.STRING(), "f1"), new DataField(6, "f2", DataTypes.STRING(), "f2"), new DataField(7, "f3", DataTypes.STRING(), "f3")), "field5"), new DataField(8, "col6", DataTypes.STRING(), "field6")), Lists.newArrayList("col1"), // partition keys Lists.newArrayList("col1", "col2"), //primary key options, "table comment"), false); // add option SchemaChange addOption = SchemaChange.setOption("snapshot.time-retained", "2h"); // remove option SchemaChange removeOption = SchemaChange.removeOption("compaction.max.file-num"); // add column SchemaChange addColumn = SchemaChange.addColumn("col1_after", DataTypes.STRING()); // add a column after col1 SchemaChange.Move after = SchemaChange.Move.after("col1_after", "col1"); SchemaChange addColumnAfterField = SchemaChange.addColumn("col7", DataTypes.STRING(), "", after); // rename column SchemaChange renameColumn = SchemaChange.renameColumn("col3", "col3_new_name"); // drop column SchemaChange dropColumn = SchemaChange.dropColumn("col6"); // update column comment SchemaChange updateColumnComment = SchemaChange.updateColumnComment(new String[]{"col4"}, "col4 field"); // update nested column comment SchemaChange updateNestedColumnComment = SchemaChange.updateColumnComment(new String[]{"col5", "f1"}, "col5 f1 field"); // update column type SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", DataTypes.DOUBLE()); // update column position, you need to pass in a parameter of type Move SchemaChange updateColumnPosition = SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4")); // update column nullability SchemaChange updateColumnNullability = SchemaChange.updateColumnNullability(new String[]{"col4"}, false); // update nested column nullability SchemaChange updateNestedColumnNullability = SchemaChange.updateColumnNullability(new String[]{"col5", "f2"}, false); SchemaChange[] schemaChanges = new SchemaChange[] { addOption, removeOption, addColumn, addColumnAfterField, renameColumn, dropColumn, updateColumnComment, updateNestedColumnComment, updateColumnType, updateColumnPosition, updateColumnNullability, updateNestedColumnNullability}; try { catalog.alterTable(identifier, Arrays.asList(schemaChanges), false); } catch (Catalog.TableNotExistException e) { // do something } catch (Catalog.TableAlreadyExistException e) { // do something } catch (Catalog.DatabaseNotExistException e) { // do something } } }
Batch Read
扫描计划:在全局节点(“坐标”或名为“驱动程序”)中生成计划拆分。读取拆分:分布式任务中的读取拆分。
import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.types.DataTypes; import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; public class ReadTable { public static void main(String[] args) { // 1. Create a ReadBuilder and push filter (`withFilter`) // and projection (`withProjection`) if necessary // [{"Alice", 12},{"Bob", 5},{"Emily", 18}] PredicateBuilder builder = new PredicateBuilder(RowType.of(DataTypes.STRING(),DataTypes.INT())); Predicate notNull = builder.isNotNull(0); Predicate greaterOrEqual = builder.greaterOrEqual(1, 12); ReadBuilder readBuilder = table.newReadBuilder() .withProjection(projection) .withFilter(Lists.newArrayList(notNull, greaterOrEqual)); // 2. Plan splits in 'Coordinator' (or named 'Driver') List<Split> splits = readBuilder.newScan().plan().splits(); // 3. Distribute these splits to different tasks // 4. Read a split in task TableRead read = readBuilder.newRead(); RecordReader<InternalRow> reader = read.createReader(splits); reader.forEachRemaining(ReadTable::readRow); } }
Batch Write
import java.util.List; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; public class WriteTable { public static void main(String[] args) { // 1. Create a WriteBuilder (Serializable) BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder() .withOverwrite(staticPartition); // 2. Write records in distributed tasks BatchTableWrite write = writeBuilder.newWrite(); GenericRow record1 = GenericRow.of(BinaryString.fromString("Alice"), 12); GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"), 5); GenericRow record3 = GenericRow.of(BinaryString.fromString("Emily"), 18); write.write(record1); write.write(record2); write.write(record3); List<CommitMessage> messages = write.prepareCommit(); // 3. Collect all CommitMessages to a global node and commit BatchTableCommit commit = writeBuilder.newCommit(); commit.commit(messages); // Abort unsuccessful commit to delete data files // commit.abort(messages); } }
Stream Read
import java.io.IOException; import java.util.List; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; public class StreamReadTable { public static void main(String[] args) throws IOException { // 1. Create a ReadBuilder and push filter (`withFilter`) // and projection (`withProjection`) if necessary ReadBuilder readBuilder = table.newReadBuilder() .withProjection(projection) .withFilter(filter); // 2. Plan splits in 'Coordinator' (or named 'Driver') StreamTableScan scan = readBuilder.newStreamScan(); while (true) { List<Split> splits = scan.plan().splits(); // Distribute these splits to different tasks Long state = scan.checkpoint(); // can be restored in scan.restore(state) after failover } // 3. Read a split in task TableRead read = readBuilder.newRead(); RecordReader<InternalRow> reader = read.createReader(splits); reader.forEachRemaining(row -> System.out.println(row)); } }
Stream Write exactly-once关键保证:
CommitUser表示一个用户。一个用户可以提交多次。在分布式处理中,您应该使用相同的commitUser。 不同的应用程序需要使用不同的commitUsers。 StreamTableWrite和StreamTableCommit的commitIdentifier需要一致,并且需要为下一次提交增加id。 发生故障时,如果仍有未提交的CommitMessages,请使用StreamTableCommit#filterCommitted通过commitIdentifier排除已提交的消息。
import java.util.List; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; public class StreamWriteTable { public static void main(String[] args) throws Exception { // 1. Create a WriteBuilder (Serializable) StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); // 2. Write records in distributed tasks StreamTableWrite write = writeBuilder.newWrite(); // commitIdentifier like Flink checkpointId long commitIdentifier = 0; while (true) { write.write(record1); write.write(record2); write.write(record3); List<CommitMessage> messages = write.prepareCommit( false, commitIdentifier); commitIdentifier++; } // 3. Collect all CommitMessages to a global node and commit StreamTableCommit commit = writeBuilder.newCommit(); commit.commit(commitIdentifier, messages); // 4. When failover, you can use 'filterCommitted' to filter committed commits. commit.filterCommitted(committedIdentifiers); } }
Flink API
pox.xml
<dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-1.17</artifactId> <version>0.5-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>1.17.0</version> <scope>provided</scope> </dependency>
Read from Table
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class ReadFromTable { public static void readFrom() throws Exception { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create paimon catalog tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); tableEnv.executeSql("USE CATALOG paimon"); // convert to DataStream Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table"); DataStream<Row> dataStream = tableEnv.toChangelogStream(table); // use this datastream dataStream.executeAndCollect().forEachRemaining(System.out::println); // prints: // +I[Bob, 12] // +I[Alice, 12] // -U[Alice, 12] // +U[Alice, 14] } }
Write to Table
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; public class WriteToTable { public static void writeTo() { // create environments of both APIs StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a changelog DataStream DataStream<Row> dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)) .returns( Types.ROW_NAMED( new String[] {"name", "age"}, Types.STRING, Types.INT)); // interpret the DataStream as a Table Schema schema = Schema.newBuilder() .column("name", DataTypes.STRING()) .column("age", DataTypes.INT()) .build(); Table table = tableEnv.fromChangelogStream(dataStream, schema); // create paimon catalog tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')"); tableEnv.executeSql("USE CATALOG paimon"); // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table); // insert into paimon table from your data stream table tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable"); } }
Read Performance
完全压缩
配置“full-compaction.delta-commits’”在Flink写入中定期执行完全压缩。并且它可以确保分区在写入结束之前被完全压缩。 建议不要设置超过快照过期时间(默认为1小时)的值。例如,如果检查点间隔为1分钟,则建议将“full-compaction.delta-commits”设置为30。
主键表
对于主键表,它是一种MOR(MergeOnRead)技术。在读取数据时,多层LSM数据被合并,并行性的数量将受到桶的数量的限制。尽管Paimon的合并会很有效,但它仍然无法赶上普通的AppendOnly表。 如果希望在某些情况下查询足够快,但只能找到较旧的数据,您可以: 配置“full-compaction.delta-commits”,在写入数据时(当前仅为Flink),将定期执行完全压缩。 将“scan.mode”配置为“compacted-full”,读取数据时,将拾取完全压缩的快照。读取性能良好。 可以在读取时灵活地平衡查询性能和数据延迟。
仅追加表
小文件可能会减慢读取速度并影响DFS的稳定性。默认情况下,当单个存储桶中的小文件超过“compaction.max.file-num”(默认为50个)时,会触发压缩。但是,当存在多个bucket时,会生成许多小文件。 可以使用完全压缩来减少小文件。完全压缩将消除大多数小文件。
Write Performance
并行度
写入初始化
在写入初始化过程中,bucket的写入程序需要读取所有历史文件。如果这里存在瓶颈(例如,同时写入大量分区),可以使用write-manifest-cache来缓存读取的清单数据,以加速初始化。
压缩
触发压缩的排序运行数 要暂停写入的排序运行数 全量合并作业
Dedicated Compaction Job: 可能会导致写吞吐量不稳定,因为在执行压缩时吞吐量可能会暂时下降。压缩会将一些数据文件标记为“已删除”(并非真正删除,有关更多信息,请参阅过期快照)。如果多个写入程序标记同一个文件,则在提交更改时会发生冲突。Paimon会自动解决冲突,但这可能会导致作业重新启动
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ compact \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ [--partition <partition-name>] \ [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ compact \ --warehouse s3:///path/to/warehouse \ --database test_db \ --table test_table \ --partition dt=20221126,hh=08 \ --partition dt=20221127,hh=09 \ --catalog-conf s3.endpoint=https://****.com \ --catalog-conf s3.access-key=***** \ --catalog-conf s3.secret-key=*****
内存
写入程序的内存缓冲区(memory buffer,),由单个任务的所有写入程序共享并抢占。此内存值可以通过write-buffer-size进行调整。 合并几个排序运行以进行压缩时消耗的内存。可以通过num-sorted-run.compaction-tigger选项进行调整,以更改要合并的排序运行数。 如果行非常大,在进行压缩时,一次读取太多行数据可能会消耗大量内存。减少read.batch-size选项可以减轻这种情况的影响。 写入不可调整的柱状(ORC、Parquet等)文件所消耗的内存。
数据写入paimon的流程