在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下! 简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
fsTableEnv.connect(new Kafka() .version("universal") .topic("jes_topic_evtime") .property("zookeeper.connect", "172.xx.xx.xxx:2181") .property("bootstrap.servers", "172.xx.xx.xxx:9092") .property("group.id", "grp1") .startFromEarliest() ).withFormat(new Json() .failOnMissingField(false).deriveSchema()) .withSchema(new Schema().field("acct", "STRING").field("evtime", "LONG").field("logictime","TIMESTAMP(3)").rowTime(new Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000))) .inAppendMode().createTemporaryTable("testTableName");
Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime FROM testTableName") .window(Tumble.over("5.seconds").on("logictime").as("w1")) .groupBy("w1, acct") .select("w1.rowtime, acctno");
测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null
*来自志愿者整理的flink邮件归档
报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL 加计算列完成满足你的需求,参考[1]
create table test ( acct STRING, evitime BIGINT, logictime as TO_TIMESTAMP(FROM_UNIXTIME(evitime)), WATERMARK FOR logictime AS logictime - INTERVAL ‘5’ SECOND, ) with( ... )
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。