开发者社区> 问答> 正文

Flink Table / SQL API:在会话窗口聚合后修改rowtime属性

是否可以rowtime在第一次session聚合后修改属性,使其等于.rowtime会话中最后一次观察到的事件?

我正在尝试做这样的事情:

table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(

'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime

)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(

'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession

)
关键部分是:

('w.rowtime - 2.minutes) as 'rowtime
所以我想重新分配.rowtime会话中最新事件的记录,没有会话间隙(2.minutes在本例中)。

这在BatchTable中工作正常,但它在StreamTable中不起作用:

Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
是的,我知道,我觉得我不想发明时间机器并改变时间顺序。但实际上是否可能以某种方式实现所描述的行为?

展开
收起
flink小助手 2018-12-06 17:25:58 9486 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    你不能在当前版本(1.6.0)中使用SQL或Table API来做到这一点。一旦修改了时间属性(rowtime或proctime),它就会成为常规TIMESTAMP属性并失去其特殊时间特征。

    对于rowtime属性,原因是我们无法保证时间戳仍与水印对齐。原则上,我们可以通过减去的时间间隔来延迟水印,但是现在还不支持。

    2019-07-17 23:18:37
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载