最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp>current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。 | select | TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd') as days, | doi.UserId, | count(doi.Code) as SendTime, | sum(doi.PayAmount / 100) as SendCashcharge, | sum(doi.PayAmount / 100 - ChargeAmount / 100 + UseBalance / 100) as SendCashuse, | sum(doi.CashMoney / 100)as SendCash | from dwd_order_info doi | where doi.DeliveryTime >cast(current_date AS TIMESTAMP) and doi.OrderType = 29 and doi.Status >= 50 and doi.Status <> 60 | group by TO_DATE(cast(doi.DeliveryTime as String),'yyyy-MM-dd'), doi.UserId*来自志愿者整理的flink邮件归档
为啥不用天级别的tumble window? 自动就帮你清楚 state 了*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。