开发者社区> 问答> 正文

请问大神们flink-sql可以指定时间清除内存中的全部State吗?

最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。 | select | TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd') as days, | doi.UserId, | count(doi.Code) as SendTime, | sum(doi.PayAmount / 100) as SendCashcharge, | sum(doi.PayAmount / 100 - ChargeAmount / 100 + UseBalance / 100) as SendCashuse, | sum(doi.CashMoney / 100)as SendCash | from dwd_order_info doi | where doi.DeliveryTime >cast(current_date AS TIMESTAMP) and doi.OrderType = 29 and doi.Status >= 50 and doi.Status <> 60 | group by TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd'), doi.UserId*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:47:10 1179 0
1 条回答
写回答
取消 提交回答
  • 对的是我!

    为啥不用天级别的tumble window? 自动就帮你清楚 state 了*来自志愿者整理的flink邮件归档

    2021-12-08 17:10:08
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
SQL Server 2017 立即下载
云服务器ECS内存增强型实例re6全新发布 立即下载