Flink CDC读取sqlserver的update,输出到kafka的消息op标志只有c,这个要怎么解决?困扰了比较久,读取mysql会是两条消息,一条d,一条c
在 Flink CDC 中,更新操作将会产生两行记录,一行表示删除旧值,另一行表示插入新的值。这是因为 Flink CDC 认为更新是一个删除操作和插入操作组成的事务。
如果您想要只发送一条包含更新操作的信息,请使用 delta.fields.include.*.before
参数,将其设置为 false。这将使 Flink CDC 只发送包含新值的单个消息:
tableEnv.executeSql(
"CREATE TABLE myschema.myTable (" +
"rowtime TIMESTAMP(3), " +
"data JSON," +
"PROCTIME as rowtime," +
"WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND," +
"PRIMARY KEY (data.id)" +
") WITH (" +
"'connector.type' = 'kafka', " +
"'connector.topic' = 'myTopic'," +
"'connector.group-id' = 'testGroup'," +
"'connector.bootstrap.servers' = 'localhost:9092'," +
"'connector.data.format' = 'json'," +
"'connector.topic-partition-discovery.interval' = '5 s'," +
"'connector.startup-mode' = 'latest-offset'," +
"'connector.property-version' = 'V2'," +
"'connector.delta.fields.include.*.before' = 'false'" +
")"
);
在上面的代码中,我们设置了 delta.fields.include.*.before
参数为 false,使其不再包含旧值。请注意,这只会将插入操作和删除操作的全部消息都作为单个消息发送。
这个问题可能是由于Flink CDC在处理MySQL的UPDATE操作时,没有正确识别到变更数据。为了解决这个问题,你可以尝试以下方法:
确保你的MySQL表有一个唯一的主键或者唯一索引,这样Flink CDC才能准确地识别出哪些行发生了变更。
检查你的MySQL表是否有触发器或者其他外部操作,这些操作可能会影响Flink CDC的读取结果。如果有,请确保这些操作不会影响数据的一致性。
尝试使用不同的Flink CDC版本,看看是否能解决问题。有时候,不同版本的Flink CDC可能存在一些已知的问题。
如果以上方法都无法解决问题,你可以考虑使用其他工具(如Debezium)来替代Flink CDC,因为Debezium对MySQL的支持可能更好。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。