请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
#==============================================================================
#============================================================================== jobmanager.rpc.address: cdh1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 6144m
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1
#当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0
#==============================================================================
#==============================================================================
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #==============================================================================
#============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
#保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints
state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #==============================================================================
#==============================================================================
#==============================================================================
#============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink
#==============================================================================
#============================================================================== heartbeat.timeout: 1800000
请教的问题:
通过 ./bin/flink run
-d -t yarn-per-job
-yjm 1536
-ytm 3072
-yD jobmanager.memory.process.size=1.5GB
-yD taskmanager.memory.process.size=3GB
-yD heartbeat.timeout=1800000
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?*来自志愿者整理的flink邮件归档
你好 根据你的建议我试了一下 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
结果出现找不到jar包的异常: org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 8 more*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。