Hi,社区的各位大家好: 我目前生产上面使用的是1.8.2版本,相对稳定 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, 截止目前先后研究了1.10.1 1.11.1共2个大版本
在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 状态后端使用的是rocksdb 的增量模式 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/",true); 设置了官网文档中找到的删除策略: TableConfig tableConfig = streamTableEnvironment.getConfig(); tableConfig.setIdleStateRetentionTime(Time.minutes(2), Time.minutes(7));
请问是我使用的方式不对吗?
通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
版本影响:flink1.10.1 flink1.11.1 planner:blink planner source : kafka source 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
sql: insert into result select request_time ,request_id ,request_cnt ,avg_resptime ,stddev_resptime ,terminal_cnt ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,commandId as request_id ,count(*) as request_cnt ,ROUND(avg(CAST(respTime
as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime
as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
union all
select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,99999999 ,count(*) as request_cnt ,ROUND(avg(CAST(respTime
as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime
as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE) )
source:
create table log ( eventTime bigint ,times timestamp(3) …………………… ,commandId integer ,watermark for times as times - interval '5' second ) with( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )
sink1: create table result ( request_time varchar ,request_id integer ,request_cnt bigint ,avg_resptime double ,stddev_resptime double ,insert_time varchar ) with ( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )*来自志愿者整理的flink邮件归档
SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。