flink checkpoint 在 window 操作下 全局配置失效的问题。-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

flink checkpoint 在 window 操作下 全局配置失效的问题。

2018-11-09 12:13:37 5961 3

背景

  • flink 版本号 1.6.2
  • flink 集群模式 flink on yarn
  • 使用flink 读取kafka 数据 简单处理之后使用自定义richWindowFunction 处理数据的时候出现异常报错:
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
    ... 5 more
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
    at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
    at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
    at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
    at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
    at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
    ... 7 more
  • flink 关于 checkpoint 配置 :
fs.default-scheme: hdfs://@hadoop:9000/
fs.hdfs.hadoopconf: hdfs:///flink/data/
state.checkpoints.dir: hdfs:///flink/checkpoints/
state.checkpoints.num-retained: 20
state.savepoints.dir: hdfs:///flink/flink-savepoints/
state.backend.fs.checkpoint.dir: hdfs:///flink/state/checkpoints/

疑惑点:

全局设置 checkpoint 保存地址 ,那么window 操作的保存地址 应该也是该位置 .
但是为什么还是会将checkpoint 使用memory 方式?

尝试解决办法:

在代码层设置 checkpoint保存模式:

env.setStateBackend(new
FsStateBackend("hdfs:///flink/checkpoints/workFlowCheckpoint"));

解决前后对比 :

解决后hdfs 目录:

image

再次疑虑:

但是在1.6.2 版本 该类没设置为Deprecated ,求问 :
我这个解决办法是有什么不准确的方式么? 还是说 全局设置checkpoint 对于window 自身并没有生效?

取消 提交回答
全部回答(3)
  • 故乡的茶干
    2019-07-17 23:13:13
    已采纳

    你好,其实你是误解了 checkpoint 配置与checkpointStorage之间的关系。
    FsStateBackend <--> 使用 FsCheckpointStorage,每个FsStateBackend的state数据会写到DFS里面,返回的handle不包含实际数据,只是一个路径地址(除非实际数据的size小于state.backend.fs.memory-threshold,会存储在返回的hanlde里面)。
    MemoryStateBackend <--> 使用 MemoryBackendCheckpointStorage,每个MemoryStateBackend的state数据都会直接存储在返回的handle里面。
    如果你只配置了checkpoint path相关的配置,由于没有声明是FsStateBackend,导致会使用默认的MemoryStateBackend,从而所有的state都会直接返回给JM,当state数据量大的时候,就会因为超过阈值而报错(默认值5MB)

    1 0
  • godfreyhe
    2019-07-17 23:13:13

    如果没有在 flink-conf.yaml 里配置 state.backend 的值,或者通过api 显示设置 statebackend,默认为 heap 模式

    0 0
  • 健东
    2019-07-17 23:13:12

    不是很了解

    0 0
添加回答
相关问答

40

回答

[@徐雷frank][¥20]什么是JAVA的平台无关性

大河人家 2018-10-29 23:55:20 144712浏览量 回答数 40

162

回答

惊喜翻倍:免费ECS+免费环境配置~!(ECS免费体验6个月活动3月31日结束)

豆妹 2014-10-29 17:52:21 226134浏览量 回答数 162

8

回答

OceanBase 使用动画(持续更新)

mq4096 2019-02-20 17:16:36 336998浏览量 回答数 8

13

回答

[@饭娱咖啡][¥20]我想知道 Java 关于引用那一块的知识

心意乱 2018-10-31 18:44:12 142454浏览量 回答数 13

110

回答

OSS存储服务-客户端工具

newegg11 2012-05-17 15:37:18 295534浏览量 回答数 110

22

回答

爬虫数据管理【问答合集】

我是管理员 2018-08-10 16:37:41 147229浏览量 回答数 22

18

回答

阿里云开放端口权限

xcxx 2016-07-20 15:03:33 646764浏览量 回答数 18

31

回答

[@倚贤][¥20]刚学完html/css/js的新手学习servlet、jsp需要注意哪些问题?

弗洛伊德6 2018-10-27 21:52:43 146034浏览量 回答数 31

42

回答

【精品问答集锦】Python热门问题

小六码奴 2019-05-30 15:27:34 136951浏览量 回答数 42

22

回答

Flink Forward Asia 2021 有奖问答

阿里云实时计算Flink 2021-12-29 17:30:44 422457浏览量 回答数 22
+关注
千狼
常年在 javaWeb 大数据领域中 突破自我 ,希望可以有更多志同道合之士一起交流
12
文章
2
问答
问答排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载