开发者社区> 问答> 正文

flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job");

}

}


请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL PROCTIME |-- rowtime: TIMESTAMP(3) ROWTIME


11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43

但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0

望指教,谢谢!*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 10:19:58 1034 0
1 条回答
写回答
取消 提交回答
  • 你指定时间语义是EventTime了吗 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*来自志愿者整理的flink邮件归档

    2021-12-07 11:33:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载