开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink sql中使用upsert-kafka作为数据源,请问是什么原因?

flink sql中使用upsert-kafka作为数据源,做一条记录的修改操作后,使用doris-connector向doris中写入的数据会出现修改前和修改后一共2条记录,请问是什么原因?是的,当对1条数据进行修改后,doris中明细模型中就存在1条原始数据 和 新产生1条修改前的原始数据+1条修改后的数据,一共3条记录了

展开
收起
真的很搞笑 2023-10-04 07:42:46 261 0
1 条回答
写回答
取消 提交回答
  • 这个问题可能是由于upsert-kafka和doris-connector的不兼容性导致的。具体的原因和解决方案如下:

    • 原因:upsert-kafka作为数据源时,会生成一个changelog流,其中每条数据记录都表示一个更新或删除事件。当对一条数据进行修改时,upsert-kafka会将修改前的数据作为DELETE事件,将修改后的数据作为INSERT/UPDATE事件,输出到changelog流中。而doris-connector作为数据汇时,会消费changelog流,并将数据写入到doris中。但是,doris-connector目前还不支持DELETE事件,只能处理INSERT/UPDATE事件。因此,当doris-connector接收到upsert-kafka输出的changelog流时,它会忽略DELETE事件,只写入INSERT/UPDATE事件。这就导致了doris中出现了修改前和修改后两条记录。
    • 解决方案:有两种可能的解决方案:
      • 一种是修改doris-connector的源码,让它支持DELETE事件,并重新编译打包。这样就可以让doris-connector正确地处理upsert-kafka输出的changelog流,并只保留最新的数据记录。
      • 另一种是不使用upsert-kafka作为数据源,而是使用普通的kafka connector,并自己实现changelog流的生成和消费逻辑。这样就可以避免upsert-kafka和doris-connector之间的不兼容性问题。
    2023-10-17 10:27:11
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载