开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC有个场景,我有两个不同数据类型的topic:a,b,这个有人遇到过吗?

Flink CDC有个场景,我有两个不同数据类型的topic:a,b,他们分别又有相同数据类型的后缀为_grey的灰度用的topic: a_grey,b_grey

a_grey和b_grey分别是用来对应a,b进行灰度切换的,灰度流程是先灰度部分数据,后面全量切换,a -> a_grey, b -> b_grey,下一次灰度就是b_grey -> b, a_grey -> a。

我会用datastream api,去拉取a,a_grey进行union,withTimestampAssigner,使用事件时间戳
用datastream api,去拉取b,b_grey进行unionr,使用事件时间戳
然后去将union之后的stream转换为table,a_union_table和b_union_table 然后用flink sql进行left interval join,a_union_table left interval join b_union_table,获取数据再转为stream,用stream api进行mapper操作,最后写入数据库。

a,a_grey,b,b_grey都有8个分区,
a和a_grey会发送到所有的8个分区有数据
但是b,b_grey,只会发送到里面四个分区,其他四个分区没有数据

现在的问题是每次灰度全量切换完成之后,flink的水印就会推进不了,停留在切换的kafka数据时间戳附近,推进不了,请问下,这个有人遇到过吗?是什么原因,可以怎么解决?flink 1.17.1和1.14.5都不行

我尝试过withIdleness,或者不用withTimestampAssigner,但是在下次切换的时候又出这种问题了?

展开
收起
真的很搞笑 2023-11-22 08:17:06 59 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    可能是因为Kafka分区不均衡导致的。由于b和b_grey只向4个分区发送数据,而a和a_grey则向所有8个分区发送数据,因此当您切换到b和b_grey时,Flink可能需要更长时间来处理所有分区的数据,从而导致水印推进困难。
    为了缓解这个问题,您可以考虑以下几种方法:

    • 均衡分配数据:确保每个Kafka分区都接收到足够多的数据,这样Flink就可以同时处理多个分区,从而更快地推进水印。
    • 提高吞吐量:通过增加任务并发度或优化代码等方式,提高Flink处理数据的能力,使水印能够快速推进。
    • 调整水印策略:在某些情况下,您可以尝试调整水印策略,例如降低水印阈值,允许更多的乱序数据进入窗口,或者启用懒评估模式等。
    2023-11-29 14:10:53
    赞同 展开评论 打赏
  • 这个问题可能是由于Flink在处理灰度切换时,时间戳推进不准确导致的。你可以尝试以下方法解决这个问题:

    1. 使用AssignerWithPeriodicWatermarks接口自定义一个带有周期性水印的时间戳分配器,这样可以更好地控制水印的生成和推进。例如:
    import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    
    public class CustomTimestampAssigner implements AssignerWithPeriodicWatermarks<Row> {
        private long currentMaxTimestamp;
    
        @Override
        public long extractTimestamp(Row element, long previousElementTimestamp) {
            // 提取事件时间戳并更新currentMaxTimestamp
            long timestamp = element.getField(0); // 假设时间戳字段为第一个字段
            currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
            return timestamp;
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            // 生成当前的最大水印
            long maxTimestamp = currentMaxTimestamp - 5000; // 假设允许的最大延迟为5秒
            return new Watermark(maxTimestamp);
        }
    }
    
    1. 在使用withTimestampAssigner时,传入自定义的时间戳分配器:
    DataStream<Row> aStream = env.addSource(new FlinkKafkaConsumer<>(...))
        .assignTimestampsAndWatermarks(new CustomTimestampAssigner())
        .keyBy(...)
        .map(...);
    
    1. 如果问题仍然存在,可以尝试调整getCurrentWatermark中的延迟阈值,以适应不同的数据流情况。
    2023-11-29 13:54:24
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载