您好! 我使用Flink SQL (Flink 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。
这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。
目前,我们这边处理的方法是: 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka topic,聚合任务再消费中间的kafka topic。
我想请教的是: 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 2.不知是否有方法在一个任务中完成下面的两步操作: 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL '5' SECOND),field2 //这一步使用rowtime聚合输出 这种方法目前存在的问题是:在定义KafkaSource 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。
我连接Kafka的代码大概如下:
tEnv.connect( new Kafka()
.topic(topic)
.version(version)
.startFromLatest()
.properties(prop))
.withFormat(new Json()
.failOnMissingField(false)
.deriveSchema())
.withSchema(new Schema()
.schema(tableSchema.getTableSchema())
.rowtime(new Rowtime()
.timestampsFromField(rowTimeField)
.watermarksPeriodicBounded(delay)))
.inAppendMode()
.registerTableSink("KafkaSource");
以上,期待您帮忙解答,非常感谢~~*来自志愿者整理的flink邮件归档
通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:
1、在 Flink 从 Kafka 中消费数据后就进行 filter 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。