请教下大家Flink CDC,目前绝大多数的Source ChangelogMode都是ALL,直接 Insert Hive,Kafka之类的Sink Connector,会直接没法同步呀,这块是怎么考虑的呢?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC的Source ChangelogMode为ALL
时,表示其输出的变更日志流包含所有类型的变更事件(INSERT、UPDATE_BEFORE、UPDATE_AFTER和DELETE)。然而,当直接将这些变更日志写入某些Sink(如Hive或Kafka)时,可能会遇到无法正确同步的问题。这是因为不同的Sink对变更日志的支持能力不同,需要根据具体场景进行适配和优化。
以下是针对该问题的详细分析和解决方案:
ALL
模式输出变更日志流,包含完整的变更信息。ALL
模式下的变更日志流,尤其是UPDATE_BEFORE
和UPDATE_AFTER
事件。UPDATE
或DELETE
事件。UPSERT
语义(即INSERT
和UPDATE
合并)。UPDATE_BEFORE
和UPDATE_AFTER
),而目标Sink可能只支持简单的INSERT
操作。UPSERT
changelog-mode
参数设置为UPSERT
,仅输出INSERT
、DELETE
和UPDATE_AFTER
事件。WITH (
'connector' = 'mysql-cdc',
'hostname' = '<hostname>',
'port' = '<port>',
'username' = '<username>',
'password' = '<password>',
'database-name' = '<database>',
'table-name' = '<table>',
'changelog-mode' = 'upsert'
)
UPSERT
语义的Sink(如Kafka Upsert Connector)配合使用。UPSERT
语义,可以在Flink作业中添加中间层,将变更日志流转换为目标Sink可接受的格式。示例:将UPDATE_BEFORE
和UPDATE_AFTER
合并为一条UPSERT
记录。
CREATE TABLE transformed_table (
id BIGINT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'transformed_topic',
'properties.bootstrap.servers' = '<kafka-brokers>',
'format' = 'json'
);
INSERT INTO transformed_table
SELECT
id,
CASE
WHEN op_type = '+I' OR op_type = '+U' THEN after_name
ELSE NULL
END AS name
FROM source_table;
INSERT
事件并丢弃UPDATE
和DELETE
事件。
INSERT INTO hive_table
SELECT * FROM source_table
WHERE op_type = '+I';
UPSERT
语义并能正确处理主键冲突。CREATE TABLE kafka_sink (
id BIGINT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = '<kafka-brokers>',
'key.format' = 'json',
'value.format' = 'json'
);
UPSERT
语义。table.exec.source.cdc-events-duplicate
参数,并通过主键去重。通过调整Flink CDC的changelog-mode
参数、使用中间层进行数据转换或选择合适的Sink Connector(如Upsert Kafka),可以有效解决ALL
模式下变更日志流无法直接同步的问题。同时,需注意目标Sink的能力限制,并根据具体需求进行适配和优化。