今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!*来自志愿者整理的flink邮件归档
据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。
不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为
datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。
EnvironmentSettings settings = EnvironmentSettings.newInstance().
inStreamingMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(), settings);
tEnv.executeSql(
"CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH
( 'connector' = 'datagen' )");
Table table = tEnv.sqlQuery("SELECT key, val FROM T");
DataStream dataStream = tEnv.toDataStream(table);
DataStream<Tuple2<Integer, Integer>> summedStream =
dataStream
.keyBy(row -> (int) row.getField(0))
.countWindow(100)
.apply(
(WindowFunction<
Row,
Tuple2<Integer, Integer>,
Integer,
GlobalWindow>)
(key, window, input, out) -> {
int sum = 0;
for (Row row : input) {
Integer field = (Integer) row.getField(1);
if (field != null) {
sum += field;
}
}
out.collect(Tuple2.of(key, sum));
})
.returns(
new TupleTypeInfo<>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
Table summedTable = tEnv.fromDataStream(summedStream);
tEnv.registerTable("S", summedTable);
tEnv.executeSql("SELECT f0, f1 FROM S").print();
*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。