4.3 测试验证
1. 访问WebUI
2. 执行wc
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3. kill掉其中一个master
4.重新执行wc,还是可以正常执行
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
5. 停止集群
/export/server/flink/bin/stop-cluster.sh
05 Flink On Yarn模式
5.1 使用Yarn优势
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
原因1:Yarn
的资源可以按需使用,提高集群的资源利用率
原因2:Yarn
的任务有优先级,根据优先级运行作业
原因3:基于Yarn
调度系统,能够自动化地处理各个角色的 Failover
(容错)
JobManager
进程和TaskManager
进程都由Yarn NodeManager
监控- 如果
JobManager
进程异常退出,则Yarn ResourceManager
会重新调度JobManager
到其他机器 - 如果
TaskManager
进程异常退出,JobManager
会收到消息并重新向Yarn ResourceManager
申请资源,重新启动TaskManager
5.2 工作原理
工作原理如下:
Client
上传jar
包和配置文件到HDFS
集群上Client
向Yarn ResourceManager
提交任务并申请资源ResourceManager
分配Container
资源并启动AppMaster
- 然后
AppMaster
加载Flink
的Jar
包和配置构建环境,启动JobManager
,JobManager
和ApplicationMaster
运行在同一个container
上。 - 一旦它们被成功启动,
AppMaster
就知道JobManager
的地址(AppMaster
它自己所在的机器),它就会为TaskManager
生成一个新的Flink
配置文件(他们就可以连接到JobManager
),这个配置文件也被上传到HDFS
上。 - 此外,
AppMaster
容器也提供了Flink
的web
服务接口,YARN
所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
。 ApplicationMaster
向ResourceManager
申请工作资源,NodeManager
加载Flink
的Jar
包和配置构建环境并启动TaskManager
TaskManager
启动后向JobManager
发送心跳包,并等待JobManager
向其分配任务
5.3 两种方式
5.3.1 Session模式
特点:需要事先申请资源,启动JobManager和TaskManger
优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
应用场景:适合作业递交比较频繁的场景,小作业比较多的场景
5.3.2 Per-Job模式
特点:每次递交作业都需要申请一次资源
优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
应用场景:适合作业比较少的场景、大作业的场景
5.4 安装部署
step1:关闭yarn的内存检查
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
添加内容:
<!-- 关闭yarn内存检查 --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
说明:
- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是
true
。- 在这里面我们需要关闭,因为对于
flink
使用yarn
模式下,很容易内存超标,这个时候yarn
会自动杀掉job
step2:同步
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml
step3:重启yarn
/export/server/hadoop/sbin/stop-yarn.sh /export/server/hadoop/sbin/start-yarn.sh
5.5 测试验证
5.5.1 Session模式
yarn-session.sh
(开辟资源) +flink run
(提交任务)
1. 在yarn上启动一个Flink会话,node1上执行以下命令
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:申请2个CPU
、1600M
内存
# -n 表示申请2个容器,这里指的就是多少个taskmanager # -tm 表示每个TaskManager的内存大小 # -s 表示每个TaskManager的slots数量 # -d 表示以后台程序方式运行
注意该警告不用管:
WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException
2. 查看UI界面:http://node1:8088/cluster
3.使用flink run提交任务:
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar • 1 • 2
运行完之后可以继续运行其他的小任务
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar • 1 • 2
4. 通过上方的ApplicationMaster可以进入Flink的管理界面:
5. 关闭yarn-session:
yarn application -kill application_1599402747874_0001 • 1
rm -rf /tmp/.yarn-properties-root
5.5.2 Per-Job分离模式
1. 直接提交job
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
# -m jobmanager的地址 # -yjm 1024 指定jobmanager的内存信息 # -ytm 1024 指定taskmanager的内存信息
2. 查看UI界面:http://node1:8088/cluster
3.注意:
在之前版本中如果使用的是
flink on yarn
方式,想切换回standalone
模式的话,如果报错需要删除:【/tmp/.yarn-properties-root
】即:rm -rf /tmp/.yarn-properties-root
因为默认查找当前
yarn
集群中已有的yarn-session
信息中的jobmanager
06 参数总结
[root@node1 bin]# /export/server/flink/bin/flink --help ./flink <ACTION> [OPTIONS] [ARGUMENTS] The following actions are available: Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -py,--python <pythonFile> Python script with the program entry point. The dependent resources can be configured with the `--pyFiles` option. -pyarch,--pyArchives <arg> Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g.: --pyArchives file:///tmp/py37.zip,file:///tmp/data. zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). -pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. -pyfs,--pyFiles <pythonFiles> Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files (e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$na menode_address/myresource2.zip). -pym,--pyModule <pythonModule> Python module with the program entry point. This option must be used in conjunction with `--pyFiles`. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g.: --pyRequirements file:///tmp/requirements.txt#file:///t mp/cached_dir). -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yat,--yarnapplicationType <arg> Set a custom application type for the application on YARN -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "run-application" runs an application in Application Mode. Syntax: run-application [OPTIONS] <jar-file> <arguments> Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Action "info" shows the optimized execution plan of the program (JSON). Syntax: info [OPTIONS] <jar-file> <arguments> "info" action options: -c,--class <classname> Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. Action "list" lists running and scheduled programs. Syntax: list [OPTIONS] "list" action options: -a,--all Show all programs and their JobIDs -r,--running Show only running programs and their JobIDs -s,--scheduled Show only scheduled programs and their JobIDs Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "stop" stops a running program with a savepoint (streaming jobs only). Syntax: stop [OPTIONS] <Job ID> "stop" action options: -d,--drain Send MAX_WATERMARK before taking the savepoint and stopping the pipelne. -p,--savepointPath <savepointPath> Path to the savepoint (for example hdfs:///flink/savepoint-1537). If no directory is specified, the configured default will be used ("state.savepoints.dir"). Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "cancel" cancels a running program. Syntax: cancel [OPTIONS] <Job ID> "cancel" action options: -s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling a job with savepoint is deprecated. Use "stop" instead. Trigger savepoint and cancel job. The target directory is optional. If no directory is specified, the configured default directory (state.savepoints.dir) is used. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "savepoint" triggers savepoints for a running job or disposes existing ones. Syntax: savepoint [OPTIONS] <Job ID> [<target directory>] "savepoint" action options: -d,--dispose <arg> Path of savepoint to dispose. -j,--jarfile <jarfile> Flink program JAR file. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
07 文末
本文主要讲解了Flink
的本地和集群的安装部署方式,谢谢各位的阅读,本文完!