开发者社区> 问答> 正文

BUG :DataStream 转 Table 后无法 触发窗口计算怎么办

1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口

例如: tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime"));

// 获得 DataStream,并定义wtm生成 SingleOutputStreamOperator r = tableEnv.toRetractStream(tableEnv.from("test3"), Row.class) .filter(x -> x.f0) // map ........ .returns(Types.TUPLE(Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(((element, recordTimestamp) -> element.f1)) );

参考 官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html

// stream - 转 Table,指定Rowtime tableEnv.createTemporaryView("test5", r, $("msg"), $("rowtime").rowtime());

String sql5 = "select " + "msg," + "count(1) cnt" + " from test5 " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + ""; tableEnv.executeSql("insert into printlnRetractSink " + sql5);

结果: 无法触发窗口操作。 查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator // 返回的wtm永远都是 -9223372036854775808 public long getCurrentWatermark() { return internalTimerService.currentWatermark(); }

// 查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是 internalTimerService.currentWatermark() 却拿的是-9223372036854775808

// 当 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); 语句改为 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test"));

结果就是正确的。 所以这是一个bug吗??*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 15:32:08 764 0
1 条回答
写回答
取消 提交回答
  • 你好, 你的flink版本是多少? 之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。

    我在[1]里修复了,你可以升级对应的版本试下。

    祝好, Leonard [1]https://issues.apache.org/jira/browse/FLINK-21013 https://issues.apache.org/jira/browse/FLINK-21013*来自志愿者整理的flink邮件归档

    2021-12-01 15:54:15
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载