开发者社区> 问答> 正文

请教Flink SQL watermark遇到未来时间的处理问题

您好! 我使用Flink SQL (Flink 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。

这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。

目前,我们这边处理的方法是: 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka topic,聚合任务再消费中间的kafka topic。

我想请教的是: 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 2.不知是否有方法在一个任务中完成下面的两步操作: 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL '5' SECOND),field2 //这一步使用rowtime聚合输出 这种方法目前存在的问题是:在定义KafkaSource 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。

我连接Kafka的代码大概如下:

tEnv.connect( new Kafka()

.topic(topic)

.version(version)

.startFromLatest()

.properties(prop))

.withFormat(new Json()

.failOnMissingField(false)

.deriveSchema())

.withSchema(new Schema()

.schema(tableSchema.getTableSchema())

.rowtime(new Rowtime()

.timestampsFromField(rowTimeField)

.watermarksPeriodicBounded(delay)))

.inAppendMode()

.registerTableSink("KafkaSource");

以上,期待您帮忙解答,非常感谢~~*来自志愿者整理的flink邮件归档

展开
收起
雪哥哥 2021-12-07 15:58:12 1218 0
1 条回答
写回答
取消 提交回答
  • 通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:

    1、在 Flink 从 Kafka 中消费数据后就进行 filter 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替*来自志愿者整理的flink

    2021-12-07 16:27:32
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载