开发者社区> 问答> 正文

checkpoint失败导致watermark不更新问题

大家好,请教一个问题: 场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小时的聚合数据源来自分钟的聚合输出,天类似)进行聚合后sink es,但是经常会在job跑几个小时后,出现大量的checkpoint失败,同时watermark不更新的现象,中间多次调整checkpoint的相关参数,也参照了网上的相关分析(http://pangongsen.com/2018/04/25/Flink%E6%B0%B4%E4%BD%8D%E7%BA%BF%E4%B8%8D%E8%A7%A6%E5%8F%91%E7%9A%84%E5%87%A0%E7%82%B9%E6%80%BB%E7%BB%93/), 仍然没找到问题点,flink接触时间不长,有点束手无策了。 task manager log: 2020-09-30 15:03:19,708 INFO org.apache.hadoop.hdfs.DFSClient - Could not complete /flink/state/error/5d895e1de420b44791b0850c23004b0e/chk-41/5991fd64-9d2c-4ac2-b08f-859efba879ee retrying... 2020-09-30 15:13:25,236 WARN org.apache.hadoop.hdfs.DataStreamer - Slow waitForAckedSeqno took 38022ms (threshold=30000ms). File being written: /flink/state/infrastructure_dep/error/5d895e1de420b44791b0850c23004b0e/shared/57f53e0d-3d58-4a5a-893d-50667065d975, block: BP-1864147273-172.20.3.102-1555051764064:blk_2307424648_1233776433, Write pipeline datanodes: [DatanodeInfoWithStorage[172.20.1.61:1004,DS-e84bae2e-224a-4826-8f65-3a4bd3bd481a,DISK], DatanodeInfoWithStorage[172.20.1.27:1004,DS-c69e89a7-88cd-45f0-b261-57fe9fb1c7d8,DISK], DatanodeInfoWithStorage[172.20.1.24:1004,DS-cd0a08e6-ca5c-4d56-8c9d-fdd331d0b270,DISK]]. checkpoint相关代码: RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(stateBackendConfig.getCheckpointDataUri(), true);

rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend((StateBackend)rocksDBStateBackend); env.enableCheckpointing(5*60000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(900000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); env.getCheckpointConfig().setFailOnCheckpointingErrors(false); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); watermark使用的是BoundedOutOfOrdernessTimestampExtractor。

*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 12:28:46 770 0
1 条回答
写回答
取消 提交回答
  • hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。

    建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。

    [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html*来自志愿者整理的flink

    2021-12-07 15:36:06
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
俞航翔|基于Log的通用增量Checkpoint 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载