Flink CDC有个场景,我有两个不同数据类型的topic:a,b,他们分别又有相同数据类型的后缀为_grey的灰度用的topic: a_grey,b_grey
a_grey和b_grey分别是用来对应a,b进行灰度切换的,灰度流程是先灰度部分数据,后面全量切换,a -> a_grey, b -> b_grey,下一次灰度就是b_grey -> b, a_grey -> a。
我会用datastream api,去拉取a,a_grey进行union,withTimestampAssigner,使用事件时间戳
用datastream api,去拉取b,b_grey进行unionr,使用事件时间戳
然后去将union之后的stream转换为table,a_union_table和b_union_table 然后用flink sql进行left interval join,a_union_table left interval join b_union_table,获取数据再转为stream,用stream api进行mapper操作,最后写入数据库。
a,a_grey,b,b_grey都有8个分区,
a和a_grey会发送到所有的8个分区有数据
但是b,b_grey,只会发送到里面四个分区,其他四个分区没有数据
现在的问题是每次灰度全量切换完成之后,flink的水印就会推进不了,停留在切换的kafka数据时间戳附近,推进不了,请问下,这个有人遇到过吗?是什么原因,可以怎么解决?flink 1.17.1和1.14.5都不行
我尝试过withIdleness,或者不用withTimestampAssigner,但是在下次切换的时候又出这种问题了?
可能是因为Kafka分区不均衡导致的。由于b和b_grey只向4个分区发送数据,而a和a_grey则向所有8个分区发送数据,因此当您切换到b和b_grey时,Flink可能需要更长时间来处理所有分区的数据,从而导致水印推进困难。
为了缓解这个问题,您可以考虑以下几种方法:
这个问题可能是由于Flink在处理灰度切换时,时间戳推进不准确导致的。你可以尝试以下方法解决这个问题:
AssignerWithPeriodicWatermarks
接口自定义一个带有周期性水印的时间戳分配器,这样可以更好地控制水印的生成和推进。例如:import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
public class CustomTimestampAssigner implements AssignerWithPeriodicWatermarks<Row> {
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
// 提取事件时间戳并更新currentMaxTimestamp
long timestamp = element.getField(0); // 假设时间戳字段为第一个字段
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// 生成当前的最大水印
long maxTimestamp = currentMaxTimestamp - 5000; // 假设允许的最大延迟为5秒
return new Watermark(maxTimestamp);
}
}
withTimestampAssigner
时,传入自定义的时间戳分配器:DataStream<Row> aStream = env.addSource(new FlinkKafkaConsumer<>(...))
.assignTimestampsAndWatermarks(new CustomTimestampAssigner())
.keyBy(...)
.map(...);
getCurrentWatermark
中的延迟阈值,以适应不同的数据流情况。版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。