flink cdc mysql写入kafka,update的时候产生俩条数据这个一般怎么处理?能否变成一条呢?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用 Flink CDC 从 MySQL 写入 Kafka 时,UPDATE
操作会产生两条数据(一条 DELETE
和一条 INSERT
),这是由于 Flink 目前不支持直接处理同时包含 UPDATE_BEFORE
和 UPDATE_AFTER
的单个 UPDATE
类型消息。这种行为是 Flink 在写入下游存储时的默认处理方式。
当 Flink 接收到 MySQL 的变更日志(例如通过 Debezium 或 Canal 格式)时,UPDATE
操作会被拆分为: 1. DELETE
消息:表示旧数据的删除(对应 UPDATE_BEFORE
)。 2. INSERT
消息:表示新数据的插入(对应 UPDATE_AFTER
)。
这种拆分的主要原因是 Flink SQL 的设计限制,无法将 UPDATE_BEFORE
和 UPDATE_AFTER
合并为一条 UPDATE
消息。
如果希望将 UPDATE
操作合并为一条消息,可以考虑以下几种方法:
Flink 提供了 Upsert Kafka 结果表,专门用于处理包含变更数据(如 INSERT
、UPDATE
、DELETE
)的场景。Upsert Kafka 会根据主键自动合并 UPDATE
操作,从而避免产生两条消息。
实现步骤: 1. 定义 Upsert Kafka 表: 在 Flink SQL 中定义一个 Upsert Kafka 表,并指定主键。
CREATE TABLE kafka_sink (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'your_kafka_broker',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO kafka_sink
SELECT * FROM mysql_source;
效果: - Upsert Kafka 会根据主键自动合并 UPDATE
操作,确保每条主键只保留最新的一条数据。 - 这种方式适用于需要将变更数据同步到 Kafka 并保持数据一致性的场景。
如果无法使用 Upsert Kafka,可以通过自定义逻辑将 DELETE
和 INSERT
消息合并为一条 UPDATE
消息。
实现步骤: 1. 解析 Debezium/Canal 消息: 使用 Flink SQL 或 DataStream API 解析 Kafka 中的消息,提取 before
和 after
字段。
CREATE TABLE kafka_source (
before ROW<id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2)>,
after ROW<id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2)>,
op STRING,
ts_ms BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'your_kafka_broker',
'format' = 'debezium-json'
);
合并消息: 使用 Flink SQL 或 DataStream API 将 DELETE
和 INSERT
消息合并为一条 UPDATE
消息。
SELECT
after.id AS id,
after.name AS name,
after.description AS description,
after.weight AS weight,
'UPDATE' AS op
FROM kafka_source
WHERE op = 'u';
写入目标 Kafka: 将合并后的消息写入目标 Kafka Topic。
INSERT INTO kafka_target
SELECT * FROM merged_stream;
效果: - 自定义逻辑可以灵活处理消息格式,但需要额外开发和维护成本。
如果无法在 Flink 中合并消息,可以在下游系统中处理两条消息。例如: - Kafka 消费端:在消费 Kafka 数据时,根据主键合并 DELETE
和 INSERT
消息。 - Paimon 表:如果下游使用 Paimon 表,可以通过设置 merge-engine
参数(如 partial-update
)来合并多条消息。
示例:
CREATE TABLE paimon_sink (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'merge-engine' = 'partial-update'
);
效果: - 下游系统可以根据主键自动合并消息,减少对 Flink 的依赖。
hash-by-key
)。如果希望将 UPDATE
操作合并为一条消息,推荐优先使用 Upsert Kafka 结果表,这是最简单且高效的方式。如果无法使用 Upsert Kafka,可以通过自定义逻辑或调整下游消费逻辑来实现消息合并。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。