写了个测试程序:
...... val tConfig = bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))......val q1=bstEnv.sqlQuery( """select createTime,feedid from source |where circleName is not null |and circleName not in('','') |and action = 'C_FEED_EDIT_SEND' |""".stripMargin) bstEnv.createTemporaryView("sourcefeed",q1) val q2=bstEnv.sqlQuery( """select feedid,postfeedid,action from source |where circleName is not null |and circleName not in('','') |and action in('C_PUBLISH','C_FORWARD_PUBLISH') |""".stripMargin)
bstEnv.createTemporaryView("postfeed",q2) bstEnv.sqlQuery( """ |select count(b.postfeedid) from |sourcefeed a |join postfeed b |on a.feedid=b.postfeedid """.stripMargin).toRetractStreamRow.print("") //------------------------------------程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
*来自志愿者整理的flink邮件归档
Join算子的state是支持清理的。 可以提供下以下信息: - Flink 版本 - planner (blink planner / old planner)
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。