流数据湖平台Apache Paimon(二)集成 Flink 引擎

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 流数据湖平台Apache Paimon(二)集成 Flink 引擎

第2章 集成 Flink 引擎

Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。

2.1 环境准备

环境准备

2.1.1 安装 Flink

1)上传并解压Flink安装包

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/

2)配置环境变量

sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh

2.1.2 上传 jar 包

1)下载并上传Paimon的jar包

jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/

2)拷贝paimon的jar包到flink的lib目录下

cp paimon-flink-1.17-0.5-20230703.002437-67.jar /opt/module/flink-1.17.0/lib

2.1.3 启动 Hadoop

(略)

2.1.4 启动 sql-client

1)修改flink-conf.yaml配置

vim /opt/module/flink-1.17.0/conf/flink-conf.yaml

#解决中文乱码,1.17之前参数是env.java.opts

env.java.opts.all: -Dfile.encoding=UTF-8

classloader.check-leaked-classloader: false

taskmanager.numberOfTaskSlots: 4

execution.checkpointing.interval: 10s

state.backend: rocksdb

state.checkpoints.dir: hdfs://hadoop102:8020/ckps

state.backend.incremental: true

2)启动 Flink集群

(1)解决依赖问题

cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/

(2)这里以 Yarn-Session模式为例

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

3)启动Flink的sql-client

/opt/module/flink-1.17.0/bin/sql-client.sh -s yarn-session

4)设置结果显示模式

SET ‘sql-client.execution.result-mode’ = ‘tableau’;

2.2 Catalog

Paimon Catalog可以持久化元数据,当前支持两种类型的metastore:

文件系统(默认):将元数据和表文件存储在文件系统中。

hive:在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。

2.2.1 文件系统

CREATE CATALOG fs_catalog WITH (
‘type’ = ‘paimon’,
‘warehouse’ = ‘hdfs://192.168.88.100:8020/paimon/fs’
);
USE CATALOG fs_catalog;
USE CATALOG fs_catalog;

2.2.2 Hive Catalog

通过使用Hive Catalog,对Catalog的更改将直接影响相应的hive metastore。在此类Catalog中创建的表也可以直接从 Hive 访问。

要使用 Hive Catalog,数据库名称、表名称和字段名称应小写。

1)上传 hive-connector

将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上传到Flink的lib目录下

2)重启yarn-session集群

3)启动hive的metastore服务

nohup hive --service metastore &

4)创建HiveCatalog

CREATE CATALOG hive_catalog WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
'uri' = 'thrift://hadoop102:9083',
'hive-conf-dir' = '/opt/module/hive/conf',
  'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'
);

USE CATALOG hive_catalog;

5)注意事项

使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置

vim /opt/module/hive/conf/hive-site.xml;

<property>
    <name>hive.metastore.disallow.incompatible.col.type.changes</name>
    <value>false</value>
  </property>

上述配置需要在hive-site.xml中配置,且hive metastore服务需要重启。

如果使用的是 Hive3,请禁用 Hive ACID:

hive.strict.managed.tables=false

hive.create.as.insert.only=false

metastore.create.as.acid=false

2.2.3 sql 初始化文件

1)创建初始化sql文件

vim conf/sql-client-init.sql

CREATE CATALOG fs_catalog WITH (
  'type' = 'paimon',
  'warehouse' = 'hdfs://hadoop102:8020/paimon/fs'
);
CREATE CATALOG hive_catalog WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
'uri' = 'thrift://hadoop102:9083',
'hive-conf-dir' = '/opt/module/hive/conf',
  'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'
);
USE CATALOG hive_catalog;
SET 'sql-client.execution.result-mode' = 'tableau';

2)启动sql-client时,指定该sql初始化文件

bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql

3)查看catalog

show catalogs;

show current catalog;

2.3 DDL

2.3.1 建表

2.3.1.1 管理表

在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。

1)创建表

CREATE TABLE test (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING,
  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

2)创建分区表

CREATE TABLE test_p (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING,
  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

通过配置partition.expiration-time,可以自动删除过期的分区。

如果定义了主键,则分区字段必须是主键的子集。

可以定义以下三类字段为分区字段:

创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。

事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。

CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。

3)Create Table As

表可以通过查询的结果创建和填充,例如,我们有一个这样的sql: CREATE TABLE table_b AS SELECT id, name FORM table_a, 生成的表table_b将相当于创建表并插入数据以下语句:CREATE TABLE table_b(id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

使用CREATE TABLE AS SELECT时我们可以指定主键或分区。

CREATE TABLE test1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE test2 AS SELECT * FROM test1;
– 指定分区
CREATE TABLE test2_p WITH (‘partition’ = ‘dt’) AS SELECT * FROM test_p;
– 指定配置
CREATE TABLE test3(
 user_id BIGINT,
 item_id BIGINT
) WITH (‘file.format’ = ‘orc’);
CREATE TABLE test3_op WITH (‘file.format’ = ‘parquet’) AS SELECT * FROM test3;
– 指定主键
CREATE TABLE test_pk WITH (‘primary-key’ = ‘dt,hh’) AS SELECT * FROM test;
– 指定主键和分区
CREATE TABLE test_all WITH (‘primary-key’ = ‘dt,hh’, ‘partition’ = ‘dt’) AS SELECT * FROM test_p;

4)Create Table Like

创建与另一个表具有相同schema、分区和表属性的表。

CREATE TABLE test_ctl LIKE test;

5)表属性

用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。有关此类属性的完整列表,请参阅配置: https://paimon.apache.org/docs/master/maintenance/configurations/

CREATE TABLE tbl(
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING,
  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) 
WITH (
  'bucket' = '2',
  'bucket-key' = 'user_id'
);

2.3.1.2 外部表

外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。

Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。

CREATE TABLE ex (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING,
  PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
  'connector' = 'paimon',
  'path' = 'hdfs://hadoop102:8020/paimon/external/ex',
  'auto-create' = 'true' 
);

2.3.1.3 临时表

仅 Flink 支持临时表。与外部表一样,临时表只是记录,但不由当前 Flink SQL 会话管理。如果临时表被删除,其资源将不会被删除。当 Flink SQL 会话关闭时,临时表也会被删除。与外部表的区别在于,临时表在Paimon Catalog中创建。

如果想将Paimon Catalog与其他表一起使用,但不想将它们存储在其他Catalog中,可以创建临时表。

USE CATALOG hive_catalog;
CREATE TEMPORARY TABLE temp (
  k INT,
  v STRING
) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://hadoop102:8020/temp.csv',
  'format' = 'csv'
);

2.3.2 修改表

2.3.2.1 修改表

1)更改/添加表属性

ALTER TABLE test SET (
‘write-buffer-size’ = ‘256 MB’
);

2)重命名表名称

ALTER TABLE test1 RENAME TO test_new;

3)删除表属性

ALTER TABLE test RESET (‘write-buffer-size’);

2.3.2.2 修改列

1)添加新列

ALTER TABLE test ADD (c1 INT, c2 STRING);

2)重命名列名称

ALTER TABLE test RENAME c1 TO c0;

3)删除列

ALTER TABLE test DROP (c0, c2);

4)更改列的可为空性

CREATE TABLE test_null(
id INT PRIMARY KEY NOT ENFORCED,
coupon_info FLOAT NOT NULL
);
– 列coupon_info修改成允许为null
ALTER TABLE test_null MODIFY coupon_info FLOAT;
– 列coupon_info修改成不允许为null
– 如果表中已经有null值, 修改之前先设置如下参数删除null值
SET ‘table.exec.sink.not-null-enforcer’ = ‘DROP’;
ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;

5)更改列注释

ALTER TABLE test MODIFY user_id BIGINT COMMENT ‘user id’;

6)添加列位置

ALTER TABLE test ADD a INT FIRST;

ALTER TABLE test ADD b INT AFTER a;

7)更改列位置

ALTER TABLE test MODIFY b INT FIRST;

ALTER TABLE test MODIFY a INT AFTER user_id;

8)更改列类型

ALTER TABLE test MODIFY a DOUBLE;

2.3.2.3 修改水印

1)添加水印

CREATE TABLE test_wm (
id INT,
name STRING,
ts BIGINT
);
ALTER TABLE test_wm ADD(
et AS to_timestamp_ltz(ts,3),
WATERMARK FOR et AS et - INTERVAL ‘1’ SECOND
);

2)更改水印

ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL ‘2’ SECOND;

3)去掉水印

ALTER TABLE test_wm DROP WATERMARK;

2.4 DML

2.4.1 插入数据

INSERT 语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }

part_spec

可选,指定分区的键值对列表,多个用逗号分隔。可以使用类型文字(例如,date’2019-01-02’)。

语法: PARTITION (分区列名称 = 分区列值 [ , … ] )

column_list

可选,指定以逗号分隔的字段列表。

语法:(col_name1 [,column_name2, …])

所有指定的列都应该存在于表中,并且不能相互重复。它包括除静态分区列之外的所有列。字段列表的大小应与 VALUES 子句或查询中的数据大小完全相同。

value_expr

指定要插入的值。可以插入显式指定的值或 NULL。必须使用逗号分隔子句中的每个值。可以指定多于一组的值来插入多行。

语法:VALUES ( { 值 | NULL } [ , … ] ) [ , ( … ) ]

目前,Flink 不支持直接使用 NULL,因此需要将 NULL 转换为实际数据类型值,比如“CAST (NULL AS STRING)”

注意:将 Nullable 字段写入 Not-null 字段

不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理,比如A表的key1是not null,B表的key2是nullable:

INSERT INTO A key1 SELECT COALESCE(key2, ) FROM B

案例:

INSERT INTO test VALUES(1,1,‘order’,‘2023-07-01’,‘1’), (2,2,‘pay’,‘2023-07-01’,‘2’);
INSERT INTO test_p PARTITION(dt=‘2023-07-01’,hh=‘1’) VALUES(3,3, ‘pv’);
– 执行模式区分流、批
INSERT INTO test_p SELECT * from test;
Paimon支持在sink阶段通过partition和bucket对数据进行shuffle。

2.4.2 覆盖数据

覆盖数据只支持batch模式。默认情况下,流式读取将忽略 INSERT OVERWRITE 生成的提交。如果你想读取

OVERWRITE的提交,你可以配置streaming-read-overwrite。
RESET ‘execution.checkpointing.interval’;
SET ‘execution.runtime-mode’ = ‘batch’;

1)覆盖未分区的表

INSERT OVERWRITE test VALUES(3,3,‘pay’,‘2023-07-01’,‘2’);

2)覆盖分区表

对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)。您可以配置动态分区覆盖来更改它。

INSERT OVERWRITE test_p SELECT * from test;

覆盖指定分区:

INSERT OVERWRITE test_p PARTITION (dt = ‘2023-07-01’, hh = ‘2’) SELECT user_id,item_id,behavior from test;

3)清空表

可以使用 INSERT OVERWRITE 通过插入空值来清除表(关闭动态分区覆盖)。
INSERT OVERWRITE test_p/*+ OPTIONS(‘dynamic-partition-overwrite’=‘false’) */ SELECT * FROM test_p WHERE false;

2.4.3 更新数据

目前,Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新记录。您可以在Flink的批处理模式下执行UPDATE。

只有主键表支持此功能。不支持更新主键。

MergeEngine 需要deduplicate或partial-update才能支持此功能。(默认deduplicate)

UPDATE test SET item_id = 4, behavior = ‘pv’ WHERE user_id = 3;

2.4.4 删除数据

从表中删除(Flink 1.17):

只有写入模式设置为change-log的表支持此功能。(有主键默认就是change-log)

如果表有主键,MergeEngine需要为deduplicate。(默认deduplicate)

DELETE FROM test WHERE user_id = 3;

2.4.5 Merge Into

通过merge into实现行级更新,只有主键表支持此功能。该操作不会产生 UPDATE_BEFORE,因此不建议设置 ‘changelog-producer’ = ‘input’。

merge-into 操作使用“upsert”语义而不是“update”,这意味着如果该行存在,则执行更新,否则执行插入。

1)语法说明:

<FLINK_HOME>/bin/flink run \
  /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
  merge-into \
  --warehouse <warehouse-path> \
  --database <database-name> \
  --table <target-table> \
  [--target-as <target-table-alias>] \
  --source-table <source-table-name> \
  [--source-sql <sql> ...]\
  --on <merge-condition> \
  --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \
  --matched-upsert-condition <matched-condition> \
  --matched-upsert-set <upsert-changes> \
  --matched-delete-condition <matched-condition> \
  --not-matched-insert-condition <not-matched-condition> \
  --not-matched-insert-values <insert-values> \
  --not-matched-by-source-upsert-condition <not-matched-by-source-condition> \
  --not-matched-by-source-upsert-set <not-matched-upsert-changes> \
  --not-matched-by-source-delete-condition <not-matched-by-source-condition> \
  [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]
--source-sql <sql> 可以传递sql来配置环境并在运行时创建源表。

“match”的说明:

(1)matched:更改的行来自目标表,每个行都可以根据条件匹配源表行(source ∩ target):

合并条件(–on)

匹配条件(–matched-xxx-condition)

(2)not-matched:更改的行来自源表,并且根据条件所有行都不能与任何目标表的行匹配(source – target):

合并条件(–on)

不匹配条件(–not-matched-xxx-condition):不能使用目标表的列来构造条件表达式。

(3)not-matched-by-source:更改的行来自目标表,并且基于条件所有行都不能与任何源表的行匹配(target – source):

合并条件(–on)

源不匹配条件(–not-matched-by-source-xxx-condition):不能使用源表的列来构造条件表达式。

2)案例实操

需要用到paimon-flink-action-xxxx.jar,上传:

cp paimon-flink-action-0.5-20230703.002437-53.jar /opt/module/flink-1.17.0/opt

下载地址:

https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/0.5-SNAPSHOT/

(1)准备测试表:

use catalog hive_catalog;
create database test;
use test;
CREATE TABLE ws1 (
  id INT,
  ts BIGINT,
  vc INT,
  PRIMARY KEY (id) NOT ENFORCED
);
INSERT INTO ws1 VALUES(1,1,1),(2,2,2),(3,3,3);
CREATE TABLE ws_t (
  id INT,
  ts BIGINT,
  vc INT,
  PRIMARY KEY (id) NOT ENFORCED
);
INSERT INTO ws_t VALUES(2,2,2),(3,3,3),(4,4,4),(5,5,5);

(2)案例一: ws_t与ws1匹配id,将ws_t中ts>2的vc改为10,ts<=2的删除

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table ws_t \
–source-table test.ws1 \
–on “ws_t.id = ws1.id” \
–merge-actions matched-upsert,matched-delete \
–matched-upsert-condition “ws_t.ts > 2” \
–matched-upsert-set “vc = 10” \
–matched-delete-condition “ws_t.ts <= 2”

(3)案例二: ws_t与ws1匹配id,匹配上的将ws_t中vc加10,ws1中没匹配上的插入ws_t中

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table ws_t \
–source-table test.ws1 \
–on “ws_t.id = ws1.id” \
–merge-actions matched-upsert,not-matched-insert \
–matched-upsert-set “vc = ws_t.vc + 10” \
–not-matched-insert-values “*”

(4)案例三: ws_t与ws1匹配id,ws_t中没匹配上的,ts大于4则vc加20,ts=4则删除

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table ws_t \
–source-table test.ws1 \
–on “ws_t.id = ws1.id” \
–merge-actions not-matched-by-source-upsert,not-matched-by-source-delete \
–not-matched-by-source-upsert-condition “ws_t.ts > 4” \
–not-matched-by-source-upsert-set “vc = ws_t.vc + 20” \
–not-matched-by-source-delete-condition " ws_t.ts = 4"

(5)案例四: 使用–source-sql创建新catalog下的源表,匹配ws_t的id,没匹配上的插入ws_t

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table ws_t \
–source-sql “CREATE CATALOG fs2 WITH (‘type’ = ‘paimon’,‘warehouse’ = ‘hdfs://hadoop102:8020/paimon/fs2’)” \
–source-sql “CREATE DATABASE IF NOT EXISTS fs2.test” \
–source-sql “CREATE TEMPORARY VIEW fs2.test.ws2 AS SELECT id+10 as id,ts,vc FROM test.ws1” \
–source-table fs2.test.ws2 \
–on “ws_t.id = ws2. id” \
–merge-actions not-matched-insert\
–not-matched-insert-values “*”

2.5 DQL查询表

2.5.1 批量查询

就像所有其他表一样,Paimon 表可以使用 SELECT 语句进行查询。

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。

在sql-client中,设置执行模式为批即可:

RESET ‘execution.checkpointing.interval’;

SET ‘execution.runtime-mode’ = ‘batch’;

2.5.1.1 时间旅行

1)读取指定id的快照

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘2’) */;

2)读取指定时间戳的快照

– 查看快照信息

SELECT * FROM ws_t&snapshots;

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)读取指定标签

SELECT * FROM ws_t /*+ OPTIONS(‘scan.tag-name’ = ‘my-tag’) */;

2.5.1.2 增量查询

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:

SELECT * FROM ws_t /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:

SELECT * FROM ws_t$audit_log /*+ OPTIONS(‘incremental-between’ = ‘3,5’) */;

2.5.2 流式查询

默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。

SET ‘execution.checkpointing.interval’=‘30s’;

SET ‘execution.runtime-mode’ = ‘streaming’;

也可以从最新读取,设置扫描模式:

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’ = ‘latest’) */

2.5.2.1 时间旅行

如果只想处理今天及以后的数据,则可以使用分区过滤器来实现:

SELECT * FROM test_p WHERE dt > ‘2023-07-01’

如果不是分区表,或者无法按分区筛选,可以使用时间旅行的流读取。

1)从指定快照id开始读取变更数据

SELECT * FROM ws_t /*+ OPTIONS(‘scan.snapshot-id’ = ‘1’) */;

2)从指定时间戳开始读取

SELECT * FROM ws_t /*+ OPTIONS(‘scan.timestamp-millis’ = ‘1688369660841’) */;

3)第一次启动时读取指定快照数据,并继续读取变化

SELECT * FROM ws_t /*+ OPTIONS(‘scan.mode’=‘from-snapshot-full’,‘scan.snapshot-id’ = ‘3’) */;

2.5.2.2 Consumer ID

1)优点

在流式读取表时指定consumer-id,这是一个实验性功能。

当流读取Paimon表时,下一个快照id将被记录到文件系统中。这有几个优点:

当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照 ID 开始读取。

在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。

当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着您可以跟踪整个管道的水印进度。

注意:消费者将防止快照过期。可以指定“consumer.expiration-time”来管理消费者的生命周期。

2)案例演示

指定consumer-id开始流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

停掉原先的流式查询,插入数据:

insert into ws_t values(6,6,6);

再次指定consumer-id流式查询:

SELECT * FROM ws_t /*+ OPTIONS(‘consumer-id’ = ‘atguigu’) */;

2.5.3 查询优化

强烈建议在查询时指定分区和主键过滤器,这将加快查询的数据跳过速度。

可以加速数据跳跃的过滤函数有:

=

<

<=

=

IN (…)

LIKE ‘abc%’

IS NULL

Paimon会按主键对数据进行排序,从而加快点查询和范围查询的速度。使用复合主键时,查询过滤器最好形成主键的最左边前缀,以获得良好的加速效果。

CREATE TABLE orders (
catalog_id BIGINT,
order_id BIGINT,
…,
PRIMARY KEY (catalog_id, order_id) NOT ENFORCED – composite primary key
)

通过为主键最左边的前缀指定范围过滤器,查询获得了很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
SELECT * FROM orders
WHERE catalog_id=1025jkjkjk
AND order_id>2035 AND order_id<6000;

下面例子的过滤器不能很好地加速查询:

SELECT * FROM orders WHERE order_id=29495;
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

2.6 系统表

系统表包含有关每个表的元数据和信息,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

2.6.1 快照表 Snapshots Table

通过snapshots表可以查询表的快照历史信息,包括快照中发生的记录数。

SELECT * FROM ws_t$snapshots;

通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。

2.6.2 模式表 Schemas Table

通过schemas表可以查询该表的历史schema。

SELECT * FROM ws_t$schemas;

可以连接快照表和模式表以获取给定快照的字段。

SELECT s.snapshot_id, t.schema_id, t.fields
FROM ws_ts n a p s h o t s s J O I N w s t snapshots s JOIN ws_tsnapshotssJOINwstschemas t
ON s.schema_id=t.schema_id where s.snapshot_id=3;

2.6.3 选项表 Options Table

可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值。

SELECT * FROM ws_t$options;

2.6.4 审计日志表 Audit log Table

如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。

rowkind 有四个值:

+I:插入操作。

-U:使用更新行的先前内容进行更新操作。

+U:使用更新行的新内容进行更新操作。

-D:删除操作。

SELECT * FROM ws_t$audit_log;

2.6.5 文件表 Files Table

可以查询特定快照表的文件。

– 查询最新快照的文件

SELECT * FROM ws_t$files;

– 查询指定快照的文件

SELECT * FROM ws_t$files /*+ OPTIONS(‘scan.snapshot-id’=‘1’) */;

2.6.6 标签表 Tags Table

通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。

SELECT * FROM ws_t$tags;

2.7 维表Join

Paimon支持Lookup Join语法,它用于从 Paimon 查询的数据来补充维度字段。要求一个表具有处理时间属性,而另一个表由查找源连接器支持。

Paimon 支持 Flink 中具有主键的表和append-only的表查找联接。以下示例说明了此功能。

USE CATALOG fs_catalog;
CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
INSERT INTO customers VALUES(1,‘zs’,‘ch’,‘123’),(2,‘ls’,‘ch’,‘456’), (3,‘ww’,‘ch’,‘789’);
CREATE TEMPORARY TABLE Orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
‘connector’ = ‘datagen’,
‘rows-per-second’=‘1’,
‘fields.order_id.kind’=‘sequence’,
‘fields.order_id.start’=‘1’,
‘fields.order_id.end’=‘1000000’,
‘fields.total.kind’=‘random’,
‘fields.total.min’=‘1’,
‘fields.total.max’=‘1000’,
‘fields.customer_id.kind’=‘random’,
‘fields.customer_id.min’=‘1’,
‘fields.customer_id.max’=‘3’
);
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

Lookup Join算子会在本地维护一个RocksDB缓存并实时拉取表的最新更新。查找连接运算符只会提取必要的数据,因此您的过滤条件对于性能非常重要。

如果Orders(主表)的记录Join缺失,因为customers(查找表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。

2.8 CDC集成

Paimon 支持多种通过模式演化将数据提取到 Paimon 表中的方法。这意味着添加的列会实时同步到Paimon表中,并且不会为此重新启动同步作业。

目前支持以下同步方式:

MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。

MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库中。

API同步表:将您的自定义DataStream输入同步到一张Paimon表中。

Kafka同步表:将一个Kafka topic的表同步到一张Paimon表中。

Kafka同步数据库:将一个包含多表的Kafka主题或多个各包含一表的主题同步到一个Paimon数据库中。

2.8.1 MySQL

添加Flink CDC 连接器。

cp flink-sql-connector-mysql-cdc-2.4.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.1.1 同步表

1)语法说明

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-table
–warehouse \
–database \
–table \
[–partition-keys ] \
[–primary-keys ] \
[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \
[–mysql-conf [–mysql-conf …]] \
[–catalog-conf [–catalog-conf …]] \
[–table-conf [–table-conf …]]

参数说明:

配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–table Paimon 表名称。
–partition-keys Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keys Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column 计算列的定义。参数字段来自 MySQL 表字段名称。
–mysql-conf Flink CDC MySQL 源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果指定的 Paimon 表不存在,此操作将自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

(1)MySQL一张表同步到Paimon一张表

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-table \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table order_info_cdc \
–primary-keys id \
–mysql-conf hostname=hadoop102 \
–mysql-conf username=root \
–mysql-conf password=000000 \
–mysql-conf database-name=gmall \
–mysql-conf table-name=‘order_info’ \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hadoop102:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4

(2)MySQL多张表同步到Paimon一张表

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-table \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table order_cdc \
–primary-keys id \
–mysql-conf hostname=hadoop102 \
–mysql-conf username=root \
–mysql-conf password=000000 \
–mysql-conf database-name=gmall \
–mysql-conf table-name=‘order_.*’ \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hadoop102:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4

2.8.1.2 同步数据库

1)语法说明

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database
–warehouse \
–database \
[–ignore-incompatible ] \
[–table-prefix ] \
[–table-suffix ] \
[–including-tables ] \
[–excluding-tables ] \
[–mysql-conf [–mysql-conf …]] \
[–catalog-conf [–catalog-conf …]] \
[–table-conf [–table-conf …]]

参数说明:

配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–ignore-incompatible 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables 用于指定要同步哪些源表。您必须使用“|”分隔多个表,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–mysql-conf Flink CDC MySQL源表的配置。每个配置都应以“key=value”的格式指定。主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。

只有具有主键的表才会被同步。

对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的 MySQL 表派生。如果 Paimon 表已存在,则其schema将与所有指定 MySQL 表的schema进行比较。

2)案例实操

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
mysql-sync-database \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table-prefix “ods_” \
–table-suffix “_cdc” \
–mysql-conf hostname=hadoop102 \
–mysql-conf username=root \
–mysql-conf password=000000 \
–mysql-conf database-name=gmall \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hadoop102:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4 \
–including-tables ‘user_info|order_info|activity_rule’

3)同步数据库下新添加的表

首先假设 Flink 作业正在同步数据库 source_db 下的表 [product、user、address]。提交作业的命令如下所示:

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database \
–warehouse hdfs:///path/to/warehouse \
–database test_db \
–mysql-conf hostname=127.0.0.1 \
–mysql-conf username=root \
–mysql-conf password=123456 \
–mysql-conf database-name=source_db \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hive-metastore:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4 \
–including-tables ‘product|user|address’

稍后,我们希望作业也同步包含历史数据的表 [order, custom]。我们可以通过从作业的先前快照中恢复并从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后自动从之前的位置继续读取变更日志。

从以前的快照恢复并添加新表进行同步的命令如下所示:

/bin/flink run \
–fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
mysql-sync-database \
–warehouse hdfs:///path/to/warehouse \
–database test_db \
–mysql-conf hostname=127.0.0.1 \
–mysql-conf username=root \
–mysql-conf password=123456 \
–mysql-conf database-name=source_db \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hive-metastore:9083 \
–table-conf bucket=4 \
–including-tables ‘product|user|address|order|custom’

2.8.2 Kafka

Flink 提供了几种 Kafka CDC 格式:canal-json、debezium-json、ogg-json、maxwell-json。如果 Kafka 主题中的消息是使用更改数据捕获 (CDC) 工具从另一个数据库捕获的更改事件,则您可以使用 Paimon Kafka CDC。将解析后的INSERT、UPDATE、DELETE消息写入到paimon表中。Paimon官网列出支持的格式如下:

添加Kafka连接器:

cp flink-sql-connector-kafka-1.17.0.jar /opt/module/flink-1.17.0/lib

重启yarn-session集群和sql-client。

2.8.2.1 同步表

1)语法说明

将 Kafka 的一个主题中的一张或多张表同步到一张 Paimon 表中。

/bin/flink run \
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
kafka-sync-table
–warehouse \
–database \
–table \
[–partition-keys ] \
[–primary-keys ] \
[–computed-column <‘column-name=expr-name(args[, …])’> [–computed-column …]] \
[–kafka-conf [–kafka-conf …]] \
[–catalog-conf [–catalog-conf …]] \
[–table-conf [–table-conf …]]

参数说明

配置 描述
–warehouse Paimon仓库路径。
–database Paimon Catalog中的数据库名称。
–table Paimon 表名称。
–partition-keys Paimon 表的分区键。如果有多个分区键,请用逗号连接,例如“dt,hh,mm”。
–primary-keys Paimon 表的主键。如果有多个主键,请用逗号连接,例如“buyer_id,seller_id”。
–computed-column 计算列的定义。参数字段来自 Kafka 主题的表字段名称。
–kafka-conf Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。
–catalog-conf Paimon Catalog的配置。每个配置都应以“key=value”的格式指定。
–table-conf Paimon 表sink的配置。每个配置都应以“key=value”的格式指定。

如果您指定的 Paimon 表不存在,此操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info单表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal

#插入数据如下:

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}
{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}
{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}
{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}
{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含单表数据)同步到 Paimon表

bin/flink run \
  /opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
  kafka-sync-table \
  --warehouse hdfs://hadoop102:8020/paimon/hive \
  --database test \
  --table kafka_user_info_cdc \
  --primary-keys id \
  --kafka-conf properties.bootstrap.servers=hadoop102:9092 \
  --kafka-conf topic=paimon_canal \
--kafka-conf properties.group.id=atguigu \
--kafka-conf scan.startup.mode=earliest-offset \
  --kafka-conf value.format=canal-json \
  --catalog-conf metastore=hive \
  --catalog-conf uri=thrift://hadoop102:9083 \
  --table-conf bucket=4 \
  --table-conf changelog-producer=input \
  --table-conf sink.parallelism=4

2.8.2.2 同步数据库

1)语法说明

将多个主题或一个主题同步到一个 Paimon 数据库中。

<FLINK_HOME>/bin/flink run \
  /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
  kafka-sync-database
  --warehouse <warehouse-path> \
  --database <database-name> \
  [--schema-init-max-read <int>] \
  [--ignore-incompatible <true/false>] \
  [--table-prefix <paimon-table-prefix>] \
  [--table-suffix <paimon-table-suffix>] \
  [--including-tables <table-name|name-regular-expr>] \
  [--excluding-tables <table-name|name-regular-expr>] \
  [--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
  [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
  [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

参数说明:

配置 描述
–warehouse The path to Paimon warehouse.通往派蒙仓库的道路。
–database Paimon 目录中的数据库名称。
–schema-init-max-read 如果您的表全部来自某个Topic,您可以设置该参数来初始化需要同步的表数量。默认值为 1000。
–ignore-incompatible 默认为 false,在这种情况下,如果 Paimon 中存在 MySQL 表名,并且它们的 schema 不兼容,则会抛出异常。您可以显式将其指定为 true 以忽略不兼容的表和异常。
–table-prefix 所有需要同步的Paimon表的前缀。例如,如果您希望所有同步表都以“ods_”作为前缀,则可以指定“–table-prefix ods_”。
–table-suffix 所有需要同步的Paimon表的后缀。用法与“–table-prefix”相同。
–including-tables 用于指定要同步哪些源表。您必须使用“|”分隔多个表。因为“|”为特殊字符,需要逗号,例如:‘a|b|c’。支持正则表达式,例如指定“–include-tables test|paimon.*”表示同步表’test’和所有表都以“paimon”开头。
–excluding-tables 用于指定哪些源表不同步。用法与“–include-tables”相同。如果同时指定了“-- except-tables”,则“-- except-tables”的优先级高于“–include-tables”。
–kafka-conf Flink Kafka 源的配置。每个配置都应以“key=value”的格式指定。 properties.bootstrap.serverstopicproperties.group.idvalue.format 是必需配置,其他配置是可选的。有关完整配置列表,请参阅其文档。
–catalog-conf Paimon 目录的配置。每个配置都应以“key=value”的格式指定。请参阅此处以获取目录配置的完整列表。
–table-conf Paimon 餐桌水槽的配置。每个配置都应以“key=value”的格式指定。请参阅此处了解表配置的完整列表。

只有具有主键的表才会被同步。

对于每个要同步的Kafka主题的表,如果对应的Paimon表不存在,该操作将自动创建该表。它的schema将从所有指定的Kafka topic的表中派生出来,它从topic中获取最早的非DDL数据解析schema。如果 Paimon 表已存在,则其schema将与所有指定 Kafka 主题表的schema进行比较。

2)案例实操

(1)准备数据(canal-json格式)

为了方便,直接将canal格式的数据插入topic里(user_info和spu_info多表数据):

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_2

#插入数据如下(注意不要有空行):

{“data”:[{“id”:“6”,“login_name”:“t7dk2h”,“nick_name”:“冰冰11”,“passwd”:null,“name”:“淳于冰”,“phone_num”:“13178654378”,“email”:“t7dk2h@263.net”,“head_img”:null,“user_level”:“1”,“birthday”:“1997-12-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689150607000,“id”:1,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“冰冰”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151566836,“type”:“UPDATE”}
{“data”:[{“id”:“7”,“login_name”:“vihcj30p1”,“nick_name”:“豪心22”,“passwd”:null,“name”:“魏豪心”,“phone_num”:“13956932645”,“email”:“vihcj30p1@live.com”,“head_img”:null,“user_level”:“1”,“birthday”:“1991-06-07”,“gender”:“M”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151623000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“豪心”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151623139,“type”:“UPDATE”}
{“data”:[{“id”:“8”,“login_name”:“02r2ahx”,“nick_name”:“卿卿33”,“passwd”:null,“name”:“穆卿”,“phone_num”:“13412413361”,“email”:“02r2ahx@sohu.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-07-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151626000,“id”:3,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“卿卿”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151626863,“type”:“UPDATE”}
{“data”:[{“id”:“9”,“login_name”:“mjhrxnu”,“nick_name”:“武新44”,“passwd”:null,“name”:“罗武新”,“phone_num”:“13617856358”,“email”:“mjhrxnu@yahoo.com”,“head_img”:null,“user_level”:“1”,“birthday”:“2001-08-08”,“gender”:null,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151630000,“id”:4,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“武新”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151630781,“type”:“UPDATE”}
{“data”:[{“id”:“10”,“login_name”:“kwua2155”,“nick_name”:“纨纨55”,“passwd”:null,“name”:“姜纨”,“phone_num”:“13742843828”,“email”:“kwua2155@163.net”,“head_img”:null,“user_level”:“3”,“birthday”:“1997-11-08”,“gender”:“F”,“create_time”:“2022-06-08 00:00:00”,“operate_time”:null,“status”:null}],“database”:“gmall”,“es”:1689151633000,“id”:5,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“login_name”:“varchar(200)”,“nick_name”:“varchar(200)”,“passwd”:“varchar(200)”,“name”:“varchar(200)”,“phone_num”:“varchar(200)”,“email”:“varchar(200)”,“head_img”:“varchar(200)”,“user_level”:“varchar(200)”,“birthday”:“date”,“gender”:“varchar(1)”,“create_time”:“datetime”,“operate_time”:“datetime”,“status”:“varchar(200)”},“old”:[{“nick_name”:“纨纨”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“login_name”:12,“nick_name”:12,“passwd”:12,“name”:12,“phone_num”:12,“email”:12,“head_img”:12,“user_level”:12,“birthday”:91,“gender”:12,“create_time”:93,“operate_time”:93,“status”:12},“table”:“user_info”,“ts”:1689151633697,“type”:“UPDATE”}
{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}
{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}
{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}
{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

再准备一个只包含spu_info单表数据的Topic:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic paimon_canal_1

#插入数据如下:

{“data”:[{“id”:“12”,“spu_name”:“华为智慧屏 4K全面屏智能电视机1”,“description”:“华为智慧屏 4K全面屏智能电视机”,“category3_id”:“86”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151648000,“id”:6,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“华为智慧屏 4K全面屏智能电视机”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151648872,“type”:“UPDATE”}
{“data”:[{“id”:“3”,“spu_name”:“Apple iPhone 13”,“description”:“Apple iPhone 13”,“category3_id”:“61”,“tm_id”:“2”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151661000,“id”:7,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“Apple iPhone 12”,“description”:“Apple iPhone 12”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151661828,“type”:“UPDATE”}
{“data”:[{“id”:“4”,“spu_name”:“HUAWEI P50”,“description”:“HUAWEI P50”,“category3_id”:“61”,“tm_id”:“3”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151669000,“id”:8,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“spu_name”:“HUAWEI P40”,“description”:“HUAWEI P40”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151669966,“type”:“UPDATE”}
{“data”:[{“id”:“1”,“spu_name”:“小米12sultra”,“description”:“小米12”,“category3_id”:“61”,“tm_id”:“1”,“create_time”:“2021-12-14 00:00:00”,“operate_time”:null}],“database”:“gmall”,“es”:1689151700000,“id”:9,“isDdl”:false,“mysqlType”:{“id”:“bigint”,“spu_name”:“varchar(200)”,“description”:“varchar(1000)”,“category3_id”:“bigint”,“tm_id”:“bigint”,“create_time”:“datetime”,“operate_time”:“datetime”},“old”:[{“description”:“小米10”}],“pkNames”:[“id”],“sql”:“”,“sqlType”:{“id”:-5,“spu_name”:12,“description”:12,“category3_id”:-5,“tm_id”:-5,“create_time”:93,“operate_time”:93},“table”:“spu_info”,“ts”:1689151700998,“type”:“UPDATE”}

(2)从一个 Kafka 主题(包含多表数据)同步到 Paimon 数据库

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
kafka-sync-database \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table-prefix “t1_” \
–table-suffix “_cdc” \
–schema-init-max-read 500 \
–kafka-conf properties.bootstrap.servers=hadoop102:9092 \
–kafka-conf topic=paimon_canal_2 \
–kafka-conf properties.group.id=atguigu \
–kafka-conf scan.startup.mode=earliest-offset \
–kafka-conf value.format=canal-json \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hadoop102:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4

从多个 Kafka 主题同步到 Paimon 数据库

bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
kafka-sync-database \
–warehouse hdfs://hadoop102:8020/paimon/hive \
–database test \
–table-prefix “t2_” \
–table-suffix “_cdc” \
–kafka-conf properties.bootstrap.servers=hadoop102:9092 \
–kafka-conf topic=“paimon_canal;paimon_canal_1” \
–kafka-conf properties.group.id=atguigu \
–kafka-conf scan.startup.mode=earliest-offset \
–kafka-conf value.format=canal-json \
–catalog-conf metastore=hive \
–catalog-conf uri=thrift://hadoop102:9083 \
–table-conf bucket=4 \
–table-conf changelog-producer=input \
–table-conf sink.parallelism=4

2.8.3 支持的schema变更

cdc 集成支持有限的schema变更。目前,框架无法删除列,因此 DROP 的行为将被忽略,RENAME 将添加新列。当前支持的架构更改包括:

(1)添加列。

(2)更改列类型:

从字符串类型(char、varchar、text)更改为长度更长的另一种字符串类型,

从二进制类型(binary、varbinary、blob)更改为长度更长的另一种二进制类型,

从整数类型(tinyint、smallint、int、bigint)更改为范围更广的另一种整数类型,

从浮点类型(float、double)更改为范围更宽的另一种浮点类型。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1355 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
监控 NoSQL Java
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
Spring Boot集成Redis启动失败【Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.G】
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
1月前
|
SQL 分布式计算 Apache
生态 | Apache Hudi集成Apache Zeppelin
生态 | Apache Hudi集成Apache Zeppelin
33 0
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1
|
1月前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
61 0
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0
|
1月前
|
存储 SQL 分布式计算
KIP-5:Apache Kylin深度集成Hudi
KIP-5:Apache Kylin深度集成Hudi
28 0
|
1月前
|
SQL Java Apache
超详细步骤!整合Apache Hudi + Flink + CDH
超详细步骤!整合Apache Hudi + Flink + CDH
83 0

推荐镜像

更多