开发者社区> 问答> 正文

yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==============================================================================

Common 通用设置选项

#============================================================================== jobmanager.rpc.address: cdh1

The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123

The total process memory size for the JobManager.

Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.

jobmanager.memory.process.size: 2048m

The total process memory size for the TaskManager.

Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 6144m

To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.

It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.

taskmanager.memory.flink.size: 1280m

The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1

The parallelism used for programs that did not specify and other parallelism.

#当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0

The default file system scheme and authority.

By default file paths without scheme are interpreted relative to the local

root file system 'file:///'. Use this to override the default and interpret

relative paths relative to a different file system,

for example 'hdfs://mynamenode:12345'

fs.default-scheme

#==============================================================================

High Availability

#==============================================================================

The high-availability mode. Possible options are 'NONE' or 'zookeeper'.

high-availability: zookeeper

The path where metadata for master recovery is persisted. While ZooKeeper stores

the small ground truth for checkpoint and leader election, this location stores

the larger objects, like persisted dataflow graphs.

Must be a durable file system that is accessible from all nodes

(like HDFS, S3, Ceph, nfs, ...)

high-availability.storageDir: hdfs:///flink/ha/

The list of ZooKeeper quorum peers that coordinate the high-availability

setup. This must be a list of the form:

"host1:clientPort,host2:clientPort,..." (default clientPort: 2181)

high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #==============================================================================

Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端

#============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP

Directory for checkpoints filesystem, when using any of the default bundled

state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录

state.checkpoints.dir: hdfs:///flink/flink-checkpoints

Default target directory for savepoints, optional.

#保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints

要保留的最大已完成检查点数

state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #==============================================================================

Advanced

#==============================================================================

Override the directories for temporary files. If not specified, the

system-specific Java temporary directory (java.io.tmpdir property) is taken.

For framework setups on Yarn or Mesos, Flink will automatically pick up the

containers' temp directories without any need for configuration.

Add a delimited list for multiple directories, using the system directory

delimiter (colon ':' on unix) or a comma, e.g.:

/data1/tmp:/data2/tmp:/data3/tmp

Note: Each directory entry is read from and written to by a different I/O

thread. You can include the same directory multiple times in order to create

multiple I/O threads against that directory. This is for example relevant for

high-throughput RAIDs.

io.tmp.dirs: /tmp

The classloading resolve order. Possible values are 'child-first' (Flink's default)

and 'parent-first' (Java's default).

Child first classloading allows users to use different dependency/library

versions in their application than those in the classpath. Switching back

to 'parent-first' may help with debugging dependency issues.

classloader.resolve-order: child-first

The amount of memory going to the network stack. These numbers usually need

no tuning. Adjusting them may be necessary in case of an "Insufficient number

of network buffers" error. The default min is 64MB, the default max is 1GB.

taskmanager.memory.network.fraction: 0.1

taskmanager.memory.network.min: 64mb

taskmanager.memory.network.max: 1gb

#==============================================================================

YARN Configuration

#============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink

#==============================================================================

HistoryServer

#============================================================================== heartbeat.timeout: 1800000

请教的问题:

通过 ./bin/flink run
-d -t yarn-per-job
-yjm 1536
-ytm 3072
-yD jobmanager.memory.process.size=1.5GB
-yD taskmanager.memory.process.size=3GB
-yD heartbeat.timeout=1800000
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 12:14:06 1756 0
1 条回答
写回答
取消 提交回答
  • 你好 根据你的建议我试了一下 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar

    jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar

    结果出现找不到jar包的异常: org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 8 more*来自志愿者整理的flink邮件归档

    2021-12-08 18:16:35
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
云服务器ECS内存增强型实例re6全新发布 立即下载
Docker on Yarn 微服务实践 立即下载
深度学习+大数据-TensorFlow on Yarn 立即下载