Flink CDC读取mysql到kafka,更新的话op不应该是u吗,出来是d和c?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink CDC读取MySQL数据并写入Kafka时,您提到更新操作(UPDATE
)的消息类型在Kafka中表现为DELETE
(D
)和INSERT
(C
),而不是预期的UPDATE
(U
)。这种现象是由于Flink CDC的实现机制以及Kafka连接器的行为决定的。以下是详细解释和解决方案:
Flink CDC通过监听MySQL的Binlog捕获数据变更,并将这些变更解析为Flink内部的Changelog流。根据知识库中的描述,Flink CDC支持以下四种消息类型: - +I
:表示INSERT
消息。 - -D
:表示DELETE
消息。 - -U
:表示UPDATE_BEFORE
消息(更新前镜像)。 - +U
:表示UPDATE_AFTER
消息(更新后镜像)。
然而,在将这些消息写入Kafka时,Kafka连接器会将UPDATE
操作拆分为两条独立的消息: - DELETE
(D
):表示删除旧记录(即UPDATE_BEFORE
部分)。 - INSERT
(C
):表示插入新记录(即UPDATE_AFTER
部分)。
这种行为的原因在于Kafka的Upsert语义: - Kafka Upsert连接器将每条消息视为一个UPSERT
操作(即插入或更新)。 - 如果消息的值为空,则表示删除操作。 - 因此,UPDATE
操作被拆分为两条消息:一条删除旧值,一条插入新值。
如果您需要确认Flink CDC输出的消息类型,可以通过以下方式检查: - 使用EXPLAIN CHANGELOG_MODE
命令查看SQL查询的Changelog模式:
EXPLAIN CHANGELOG_MODE SELECT * FROM mysql_source;
输出结果会显示每个节点的消息类型(如+I
、-D
、-U
、+U
等)。
kafka-console-consumer --bootstrap-server <broker_address> --topic <topic_name> --from-beginning
如果希望在Kafka中直接获取UPDATE
消息(U
),可以考虑以下方法:
Debezium格式支持将UPDATE
操作编码为包含before
和after
字段的JSON消息。例如:
{
"before": {
"id": 1,
"name": "old_value"
},
"after": {
"id": 1,
"name": "new_value"
},
"op": "u"
}
配置步骤: 1. 在Flink SQL中定义Kafka表时,指定format
为debezium-json
:
CREATE TABLE kafka_sink (
id BIGINT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'mysql_topic',
'properties.bootstrap.servers' = '<broker_address>',
'format' = 'debezium-json'
);
DELETE
和INSERT
消息如果必须使用Kafka Upsert语义,可以在下游处理逻辑中合并DELETE
和INSERT
消息。例如: - 使用Flink SQL的JOIN
操作将DELETE
和INSERT
消息关联起来。 - 或者在消费Kafka数据时,通过自定义逻辑将两条消息合并为一条UPDATE
消息。
在某些场景下,可以通过调整Kafka Sink的配置来优化消息输出。例如: - 启用sink.ignore-null-when-update
参数,避免生成多余的DELETE
消息。 - 配置key.format
以确保消息按主键分区,从而保证同一主键的消息顺序。
UPDATE
拆分为DELETE
和INSERT
可能会增加Kafka的消息量,进而影响吞吐量和存储成本。UPDATE
操作,可能需要额外的逻辑来合并DELETE
和INSERT
消息。Flink CDC将MySQL的UPDATE
操作拆分为DELETE
和INSERT
消息是Kafka Upsert语义的默认行为。如果需要直接获取UPDATE
消息,可以使用Debezium格式或在下游逻辑中合并消息。根据实际需求选择合适的方案,并注意性能和复杂度的权衡。