第一种【yarn-session.sh(开辟资源)+flink run(提交任务)】
启动一个一直运行的flink集群 ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
附着到一个已存在的
flink yarn session ./bin/yarn-session.sh -id application_1463870264508_0029 执行任务
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt
停止任务 【web界面或者命令行执行cancel命令】
: Cannot fulfill the minimum memory requirements with the provided cluster specification. Please increase the memory of the cluster. at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:453) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:475) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:413) ... 7 more Caused by: java.lang.IllegalArgumentException: The configuration value 'containerized.heap-cutoff-min'='600' is larger than the total container memory 512 at org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters.calculateCutoffMB(ContaineredTaskManagerParameters.java:133) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.validateClusterSpecification(AbstractYarnClusterDescriptor.java:450) ... 9 more ------------------------------------------------------------ The program finished with the following exception: java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1769) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:811) Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:608) 修改etc/hadoop/yarn-site.xml <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
第二种【flink run -m yarn-cluster(开辟资源+提交任务)】
启动集群,执行任务 ./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败
./bin/yarn-session.sh 命令分析
./bin/flink run 命令分析
run [OPTIONS] <jar-file> <arguments>
"run" 操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。
默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
连接指定host和port的jobmanager: ./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
]# ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt 2021-08-11 09:26:34,043 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at hadoop100/192.168.56.10:8032 2021-08-11 09:26:34,241 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2021-08-11 09:26:34,241 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2021-08-11 09:26:34,689 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} 2021-08-11 09:26:35,516 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/data/soft/flink-1.6.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2021-08-11 09:26:41,539 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1628560301758_0001 2021-08-11 09:26:41,882 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1628560301758_0001 2021-08-11 09:26:41,882 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2021-08-11 09:26:41,888 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2021-08-11 09:26:52,798 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. Starting execution of program Program execution finished Job with JobID c31cf3c932a3f2b7eefc8d4d926d9345 has finished. Job Runtime: 21323 ms
Flink on yarn内部实现