代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
背景介绍
单机模式
在之前的章节中,已经验证过,但是实际运行的时候,我们是需要提交到服务器去运行的。
集群模式
在之前章节中,已经验证过,(方便测试) 使用了 Docker 的方式进行部署。同时也利用容器编排工具docker-compose的方式,进行集群的模式部署:
简单测试 1 x JobManager + 1 x TaskManager
集群模式 1 x JobManager + 3 x TaskManager
HA模式 2 x JobManager + Zookeeper + 3 x TaskManager
但是对于上述的 docker-compose 编排的方式,我们依然是在一台机器上的。对于实际的生产情况,我们可能需要面对多台机器进行部署,同时最好的方案是:动态扩容、动态缩容,最大程度的利用资源。
云原生部署
首先确保你有 Kubernetes
然后为了简化 这里借助 Rancher 可视化进行操作
等熟悉了,你可以使用 yaml尝试部署。
最后,生产环境可借助 Helm + Flink Operator 极大简化过程。
JobManager
需要开放的端口
如下:
完整的Yaml
如下:
apiVersion: apps/v1 kind: StatefulSet metadata: annotations: field.cattle.io/creatorId: u-s4mkr field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31120,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30985,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30166,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true}]' creationTimestamp: "2023-06-08T08:49:14Z" generation: 21 labels: cattle.io/creator: norman workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager managedFields: - apiVersion: apps/v1 fieldsType: FieldsV1 fieldsV1: f:metadata: f:annotations: .: {} f:field.cattle.io/creatorId: {} f:field.cattle.io/publicEndpoints: {} f:labels: .: {} f:cattle.io/creator: {} f:workload.user.cattle.io/workloadselector: {} f:spec: f:podManagementPolicy: {} f:replicas: {} f:revisionHistoryLimit: {} f:selector: {} f:serviceName: {} f:template: f:metadata: f:annotations: .: {} f:cattle.io/timestamp: {} f:field.cattle.io/ports: {} f:labels: .: {} f:workload.user.cattle.io/workloadselector: {} f:spec: f:containers: k:{"name":"jobmanager"}: .: {} f:args: {} f:env: .: {} k:{"name":"JOB_MANAGER_RPC_ADDRESS"}: .: {} f:name: {} f:value: {} f:image: {} f:imagePullPolicy: {} f:name: {} f:ports: .: {} k:{"containerPort":6123,"protocol":"TCP"}: .: {} f:containerPort: {} f:name: {} f:protocol: {} k:{"containerPort":6124,"protocol":"TCP"}: .: {} f:containerPort: {} f:name: {} f:protocol: {} k:{"containerPort":8081,"protocol":"TCP"}: .: {} f:containerPort: {} f:name: {} f:protocol: {} f:resources: {} f:securityContext: .: {} f:allowPrivilegeEscalation: {} f:capabilities: {} f:privileged: {} f:readOnlyRootFilesystem: {} f:runAsNonRoot: {} f:stdin: {} f:terminationMessagePath: {} f:terminationMessagePolicy: {} f:tty: {} f:dnsConfig: {} f:dnsPolicy: {} f:restartPolicy: {} f:schedulerName: {} f:securityContext: {} f:terminationGracePeriodSeconds: {} f:updateStrategy: f:type: {} manager: rancher operation: Update time: "2023-06-08T11:20:03Z" - apiVersion: apps/v1 fieldsType: FieldsV1 fieldsV1: f:status: f:collisionCount: {} f:currentReplicas: {} f:currentRevision: {} f:observedGeneration: {} f:readyReplicas: {} f:replicas: {} f:updateRevision: {} f:updatedReplicas: {} manager: kube-controller-manager operation: Update time: "2024-06-27T01:24:45Z" name: jobmanager namespace: flink-server resourceVersion: "126393476" uid: 461079aa-69f6-423e-85b7-36fdc5c513e7 spec: podManagementPolicy: OrderedReady replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager serviceName: jobmanager template: metadata: annotations: cattle.io/timestamp: "2023-06-08T11:18:06Z" field.cattle.io/ports: '[[{"containerPort":8081,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm81","protocol":"TCP","sourcePort":31120},{"containerPort":6123,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm23","protocol":"TCP"},{"containerPort":6124,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm24","protocol":"TCP"}]]' creationTimestamp: null labels: workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager spec: containers: - args: - jobmanager env: - name: JOB_MANAGER_RPC_ADDRESS value: jobmanager image: 10.10.52.8/flink/flink imagePullPolicy: Always name: jobmanager ports: - containerPort: 8081 name: jm81 protocol: TCP - containerPort: 6123 name: jm23 protocol: TCP - containerPort: 6124 name: jm24 protocol: TCP resources: {} securityContext: allowPrivilegeEscalation: false capabilities: {} privileged: false readOnlyRootFilesystem: false runAsNonRoot: false stdin: true terminationMessagePath: /dev/termination-log terminationMessagePolicy: File tty: true dnsConfig: {} dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 updateStrategy: type: RollingUpdate status: collisionCount: 0 currentReplicas: 1 currentRevision: jobmanager-6ddbf8767b observedGeneration: 21 readyReplicas: 1 replicas: 1 updateRevision: jobmanager-6ddbf8767b updatedReplicas: 1
TaskManager
需要开放的端口
如下:
完整Yaml
如下:
apiVersion: apps/v1 kind: StatefulSet metadata: annotations: field.cattle.io/creatorId: u-s4mkr field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31538,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30067,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true}]' creationTimestamp: "2023-06-08T08:49:30Z" generation: 10 labels: cattle.io/creator: norman workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager managedFields: - apiVersion: apps/v1 fieldsType: FieldsV1 fieldsV1: f:metadata: f:annotations: .: {} f:field.cattle.io/creatorId: {} f:field.cattle.io/publicEndpoints: {} f:labels: .: {} f:cattle.io/creator: {} f:workload.user.cattle.io/workloadselector: {} f:spec: f:podManagementPolicy: {} f:replicas: {} f:revisionHistoryLimit: {} f:selector: {} f:serviceName: {} f:template: f:metadata: f:annotations: .: {} f:cattle.io/timestamp: {} f:field.cattle.io/ports: {} f:field.cattle.io/publicEndpoints: {} f:labels: .: {} f:workload.user.cattle.io/workloadselector: {} f:spec: f:containers: k:{"name":"taskmanager"}: .: {} f:args: {} f:env: .: {} k:{"name":"JOB_MANAGER_RPC_ADDRESS"}: .: {} f:name: {} f:value: {} f:image: {} f:imagePullPolicy: {} f:name: {} f:ports: .: {} k:{"containerPort":6121,"protocol":"TCP"}: .: {} f:containerPort: {} f:name: {} f:protocol: {} k:{"containerPort":6122,"protocol":"TCP"}: .: {} f:containerPort: {} f:name: {} f:protocol: {} f:resources: {} f:securityContext: .: {} f:allowPrivilegeEscalation: {} f:privileged: {} f:readOnlyRootFilesystem: {} f:runAsNonRoot: {} f:stdin: {} f:terminationMessagePath: {} f:terminationMessagePolicy: {} f:tty: {} f:dnsPolicy: {} f:restartPolicy: {} f:schedulerName: {} f:securityContext: {} f:terminationGracePeriodSeconds: {} f:updateStrategy: f:type: {} manager: rancher operation: Update time: "2023-06-08T11:20:40Z" - apiVersion: apps/v1 fieldsType: FieldsV1 fieldsV1: f:status: f:collisionCount: {} f:currentReplicas: {} f:currentRevision: {} f:observedGeneration: {} f:readyReplicas: {} f:replicas: {} f:updateRevision: {} f:updatedReplicas: {} manager: kube-controller-manager operation: Update time: "2024-06-27T01:24:58Z" name: taskmanager namespace: flink-server resourceVersion: "126393537" uid: 9ddfa3b3-b497-43af-9bb6-af83ffa0aed3 spec: podManagementPolicy: OrderedReady replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager serviceName: taskmanager template: metadata: annotations: cattle.io/timestamp: "2023-06-08T11:19:53Z" field.cattle.io/ports: '[[{"containerPort":6121,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm21","protocol":"TCP"},{"containerPort":6122,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm22","protocol":"TCP"}]]' field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"allNodes":true,"port":31538,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"},{"addresses":["10.10.52.11"],"allNodes":true,"port":30067,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"}]' creationTimestamp: null labels: workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager spec: containers: - args: - taskmanager env: - name: JOB_MANAGER_RPC_ADDRESS value: jobmanager image: 10.10.52.8/flink/flink imagePullPolicy: Always name: taskmanager ports: - containerPort: 6121 name: tm21 protocol: TCP - containerPort: 6122 name: tm22 protocol: TCP resources: {} securityContext: allowPrivilegeEscalation: false privileged: false readOnlyRootFilesystem: false runAsNonRoot: false stdin: true terminationMessagePath: /dev/termination-log terminationMessagePolicy: File tty: true dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 updateStrategy: type: RollingUpdate status: collisionCount: 0 currentReplicas: 1 currentRevision: taskmanager-6fd48fdb8d observedGeneration: 10 readyReplicas: 1 replicas: 1 updateRevision: taskmanager-6fd48fdb8d updatedReplicas: 1
测试效果
控制台
(根据自己的情况)
http://10.10.52.11:31120/#/overview
扩容TaskManager
当我们对 TaskManager
进行扩容的时候,这里我扩展了 5个
,正在逐步扩容,完毕之后 ,我们可以对应观察到控制台中的:
HA模式
同样的操作,你需要在 K8s 集群中用如下的方案来做高可用方案:
- 3 x JobManager
- N x TaskManager
- Zookeeper