开发者社区> 问答> 正文

Flink SQL是否支持Count Window函数?

今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 16:59:37 1155 0
1 条回答
写回答
取消 提交回答
  • 据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。

    不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为

    datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。

    EnvironmentSettings settings = EnvironmentSettings.newInstance().

    inStreamingMode().build();

    StreamTableEnvironment tEnv =

    StreamTableEnvironment.create(

    StreamExecutionEnvironment.getExecutionEnvironment(), settings);

    tEnv.executeSql(

    "CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH

    ( 'connector' = 'datagen' )");

    Table table = tEnv.sqlQuery("SELECT key, val FROM T");

    DataStream dataStream = tEnv.toDataStream(table);

    DataStream<Tuple2<Integer, Integer>> summedStream =

    dataStream

    .keyBy(row -> (int) row.getField(0))

    .countWindow(100)

    .apply(

    (WindowFunction<

    Row,

    Tuple2<Integer, Integer>,

    Integer,

    GlobalWindow>)

    (key, window, input, out) -> {

    int sum = 0;

    for (Row row : input) {

    Integer field = (Integer) row.getField(1);

    if (field != null) {

    sum += field;

    }

    }

    out.collect(Tuple2.of(key, sum));

    })

    .returns(

    new TupleTypeInfo<>(

    BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));

    Table summedTable = tEnv.fromDataStream(summedStream);

    tEnv.registerTable("S", summedTable);

    tEnv.executeSql("SELECT f0, f1 FROM S").print();

    *来自志愿者整理的FLINK邮件归档

    2021-12-02 17:12:40
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载