flink sql中使用upsert-kafka作为数据源,请问是什么原因?

flink sql中使用upsert-kafka作为数据源,做一条记录的修改操作后,使用doris-connector向doris中写入的数据会出现修改前和修改后一共2条记录,请问是什么原因?是的,当对1条数据进行修改后,doris中明细模型中就存在1条原始数据 和 新产生1条修改前的原始数据+1条修改后的数据,一共3条记录了

展开
收起
真的很搞笑 2023-10-04 07:42:46 392 分享 版权
1 条回答
写回答
取消 提交回答
  • 这个问题可能是由于upsert-kafka和doris-connector的不兼容性导致的。具体的原因和解决方案如下:

    • 原因:upsert-kafka作为数据源时,会生成一个changelog流,其中每条数据记录都表示一个更新或删除事件。当对一条数据进行修改时,upsert-kafka会将修改前的数据作为DELETE事件,将修改后的数据作为INSERT/UPDATE事件,输出到changelog流中。而doris-connector作为数据汇时,会消费changelog流,并将数据写入到doris中。但是,doris-connector目前还不支持DELETE事件,只能处理INSERT/UPDATE事件。因此,当doris-connector接收到upsert-kafka输出的changelog流时,它会忽略DELETE事件,只写入INSERT/UPDATE事件。这就导致了doris中出现了修改前和修改后两条记录。
    • 解决方案:有两种可能的解决方案:
      • 一种是修改doris-connector的源码,让它支持DELETE事件,并重新编译打包。这样就可以让doris-connector正确地处理upsert-kafka输出的changelog流,并只保留最新的数据记录。
      • 另一种是不使用upsert-kafka作为数据源,而是使用普通的kafka connector,并自己实现changelog流的生成和消费逻辑。这样就可以避免upsert-kafka和doris-connector之间的不兼容性问题。
    2023-10-17 10:27:11
    赞同 1 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理