开发者社区> 问答> 正文

pyflink 如何使用session window对相同pv数据聚合?

hi,all: 一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。 希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w") t_env.from_path('source')
.window(session_window)
.group_by("w,pv_id")
.select("pv_id,get_act(act)").insert_into("sink")

http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 15:35:32 660 0
1 条回答
写回答
取消 提交回答
  • 1.12 还不支持session window的udaf,在1.13上将提供这部分的支持,具体可以关注JIRA[1]。

    然后,1.12是支持ProcessFunction和KeyedProcessFunction的,具体可以参考代码[2]

    [1] https://issues.apache.org/jira/browse/FLINK-21630

    [2]

    https://github.com/apache/flink/blob/release-1.12/flink-python/pyflink/datastream/functions.py*来自志愿者整理的flink邮件归档

    2021-12-01 15:57:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
From Zero to Data Flow In Hours with Apache Nifi 立即下载
Get rid of traditional ETL, Move to Spark! 立即下载
Phoenix Search Index 功能与应用场景介绍 立即下载