大家好,请教一个问题: 场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小时的聚合数据源来自分钟的聚合输出,天类似)进行聚合后sink es,但是经常会在job跑几个小时后,出现大量的checkpoint失败,同时watermark不更新的现象,中间多次调整checkpoint的相关参数,也参照了网上的相关分析(http://pangongsen.com/2018/04/25/Flink%E6%B0%B4%E4%BD%8D%E7%BA%BF%E4%B8%8D%E8%A7%A6%E5%8F%91%E7%9A%84%E5%87%A0%E7%82%B9%E6%80%BB%E7%BB%93/), 仍然没找到问题点,flink接触时间不长,有点束手无策了。 task manager log: 2020-09-30 15:03:19,708 INFO org.apache.hadoop.hdfs.DFSClient - Could not complete /flink/state/error/5d895e1de420b44791b0850c23004b0e/chk-41/5991fd64-9d2c-4ac2-b08f-859efba879ee retrying... 2020-09-30 15:13:25,236 WARN org.apache.hadoop.hdfs.DataStreamer - Slow waitForAckedSeqno took 38022ms (threshold=30000ms). File being written: /flink/state/infrastructure_dep/error/5d895e1de420b44791b0850c23004b0e/shared/57f53e0d-3d58-4a5a-893d-50667065d975, block: BP-1864147273-172.20.3.102-1555051764064:blk_2307424648_1233776433, Write pipeline datanodes: [DatanodeInfoWithStorage[172.20.1.61:1004,DS-e84bae2e-224a-4826-8f65-3a4bd3bd481a,DISK], DatanodeInfoWithStorage[172.20.1.27:1004,DS-c69e89a7-88cd-45f0-b261-57fe9fb1c7d8,DISK], DatanodeInfoWithStorage[172.20.1.24:1004,DS-cd0a08e6-ca5c-4d56-8c9d-fdd331d0b270,DISK]]. checkpoint相关代码: RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(stateBackendConfig.getCheckpointDataUri(), true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend((StateBackend)rocksDBStateBackend); env.enableCheckpointing(5*60000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(900000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); env.getCheckpointConfig().setFailOnCheckpointingErrors(false); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); watermark使用的是BoundedOutOfOrdernessTimestampExtractor。
*来自志愿者整理的flink邮件归档
hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。
建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。