一、架构图
- client客户端提交任务给 JobManager
- JobManager 负责Flink集群计算资源管理, 并分发任务给TaskManager执行
- TaskManager定期向JobManager汇报状态
- Flink的TM就是运行在不同节点上JVM进程(process), 这个进程会拥有一定量的资源. 比如内存、CPU、网络、磁盘等. flink将进程的内存进行了划分到多个slot中.
上图中有两个 TaskManager, 每个 TaskManager有2个 slot的, 每个slot占有1/2的内存.
二、 集群规划
服务器 | 角色 |
---|---|
node1 | Master, slave |
node2 | slave |
node3 | slave |
三、 集群搭建
1. 修改安装目录下conf文件夹内的flink-conf.yaml配置文件,指定JobManager
cd /export/server/flink/conf
vim flink-conf.yaml
# jobManager 的IP地址
jobmanager.rpc.address: node1
# JobManager 的端口号
jobmanager.rpc.port: 6123
# JobManager的总进程内存大小
jobmanager.memory.process.size: 1024m
# TaskManager的总进程内存大小
taskmanager.memory.process.size: 1024m
# 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 2
#是否进行预分配内存,默认不进行预分配,这样在我们不使用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
# 程序默认并行计算的个数
parallelism.default: 1
#JobManager的Web界面的端口(默认:8081)
rest.port: 8081
小结
taskmanager.numberOfTaskSlots:2
每一个taskmanager中的分配2个TaskSlot,3个taskmanager一共有6个TaskSlot
parallelism.default:1 运行程序默认的并行度为1,6个TaskSlot只用了1个,有5个空闲
==slot==是静态的概念,是指taskmanager具有的最大并发执行能力
==parallelism==是动态的概念,是指程序运行时实际使用的并发能力
2. 修改安装目录下conf文件夹内的workers配置文件,指定TaskManager
cd /export/server/flink/conf
vim workers
node1
node2
node3
3. 使用vi修改 /etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR目录
vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
注意: 必须验证 hadoop 路径是否正确
4. 分发/etc/profile到其他两个节点
scp -r /etc/profile node2:/etc
scp -r /etc/profile node3:/etc
5. 每个节点重新加载环境变量
source /etc/profile
6. 将配置好的Flink目录分发给其他的两台节点
scp -r flink-1.14.0/ node2:/export/server/
scp -r flink-1.14.0/ node3:/export/server/
在node2和node3上设置flink的快捷方式
ln -s /export/server/flink-1.14.0/ /export/server/flink
cd /export/server/ && ll | grep flink