开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问flinkSQL做分组聚合是,怎么实现回撤呢?不带窗口的聚合运算,始终无法实现回撤,大家是怎么处

请问flinkSQL做分组聚合是,怎么实现回撤呢?不带窗口的聚合运算,始终无法实现回撤,大家是怎么处理的呢?

展开
收起
十一0204 2023-04-11 09:46:29 309 0
2 条回答
写回答
取消 提交回答
  • 值得去的地方都没有捷径

    Flink SQL在不带窗口的聚合运算中实现回撤需要使用Flink的事件时间(Event Time)特性以及状态变量。

    事件时间是根据事件本身产生时间来计算的,而不是依据Flink接收事件的时间。在Flink SQL中,可以使用rowtime来指定事件时间的字段。

    接下来,需要使用状态变量存储先前的聚合结果,在后续数据的到来时,计算新的聚合结果,然后用新的结果替换旧的结果。同时,对于旧的结果也需要更新,以便下一次撤回操作能够得到正确的结果。

    具体实现步骤如下:

    在CREATE TABLE语句中,使用ROWTIME(或PROCTIME)语义来定义事件时间字段: CREATE TABLE myTable ( id INT, value BIGINT, row_time TIMESTAMP(3), WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND -- 定义水印 ) WITH ( 'connector.type' = 'kafka', ... ) 在查询中,使用事件时间窗口(TUMBLE、HOP或SESSION)进行聚合: SELECT id, SUM(value) FROM myTable GROUP BY id, TUMBLE(row_time, INTERVAL '1' MINUTE) 计算新的聚合结果,并更新状态变量: public class MyAggregateFunction extends AggregateFunction<Long, Tuple2<Long, Boolean>> {

    @Override public Tuple2<Long, Boolean> createAccumulator() { return Tuple2.of(0L, false); }

    @Override public Tuple2<Long, Boolean> add(Long value, Tuple2<Long, Boolean> accumulator) { return Tuple2.of(accumulator.f0 + value, true); }

    @Override public Long getResult(Tuple2<Long, Boolean> accumulator) { return accumulator.f0; }

    @Override public Tuple2<Long, Boolean> merge(Tuple2<Long, Boolean> acc1, Tuple2<Long, Boolean> acc2) { if (acc1.f1 && !acc2.f1) { // acc1 变成旧的结果 acc1.f0 -= acc2.f0; // 回撤 } else if (!acc1.f1 && acc2.f1) { // acc2 变成旧的结果 acc2.f0 -= acc1.f0; // 回撤 } // 更新状态变量为最新的结果 return Tuple2.of(acc1.f0 + acc2.f0, true); } } 将聚合函数应用到查询中: SELECT id, MyAggregateFunction(value) FROM myTable GROUP BY id 在使用Flink SQL进行分组聚合时,根据具体业务需求,可以在SQL语句中加入OVER PARTITION BY或OVER ORDER BY等其他语法,以满足更细粒度的需求。

    2023-04-17 16:40:17
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。
    • 如果要实现回撤,需要使用带有时间滚动窗口的分组聚合操作。首先,必须使用如下代码开启Checkpoint:
    env.enableCheckpointing(1000); // 开启checkpoint
    
    • 然后需要设置窗口时间以及触发条件:
    SELECT
      TUMBLE_START(rowTime, INTERVAL '5' SECOND) as window_start,
      COUNT(id)
    FROM
       myTable
    GROUP BY
       TUMBLE(rowTime, INTERVAL '5' SECOND)
    
    • 这里的TUMBLE_START函数指定窗口的开始时间,TUMBLE函数指定窗口的大小和滑动步长。Flink会按照窗口的大小将数据进行分组,并计算每个窗口内每个组的聚合结果。当一个窗口结束时,Flink会将聚合结果输出到下游任务中。

    • 同时,当执行完聚合结果时,Flink会将当前状态进行checkpoint,以备回撤。接下来,如果有新数据进来,Flink会根据时间戳将其分配到对应的窗口中进行聚合,并将新的结果输出到下游任务。如果需要进行回撤,则可以通过之前checkpoint保存的状态,将之前的聚合结果重新计算,保证数据的准确性和一致性。

    2023-04-11 10:40:55
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载