Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

背景介绍

单机模式

在之前的章节中,已经验证过,但是实际运行的时候,我们是需要提交到服务器去运行的。


集群模式

在之前章节中,已经验证过,(方便测试) 使用了 Docker 的方式进行部署。同时也利用容器编排工具docker-compose的方式,进行集群的模式部署:


简单测试 1 x JobManager + 1 x TaskManager

集群模式 1 x JobManager + 3 x TaskManager

HA模式 2 x JobManager + Zookeeper + 3 x TaskManager

但是对于上述的 docker-compose 编排的方式,我们依然是在一台机器上的。对于实际的生产情况,我们可能需要面对多台机器进行部署,同时最好的方案是:动态扩容、动态缩容,最大程度的利用资源。


云原生部署

首先确保你有 Kubernetes

然后为了简化 这里借助 Rancher 可视化进行操作

等熟悉了,你可以使用 yaml尝试部署。

最后,生产环境可借助 Helm + Flink Operator 极大简化过程。

JobManager

需要开放的端口如下:

完整的Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31120,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30985,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30166,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:14Z"
  generation: 21
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"jobmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6123,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6124,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":8081,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:capabilities: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsConfig: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:03Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:45Z"
  name: jobmanager
  namespace: flink-server
  resourceVersion: "126393476"
  uid: 461079aa-69f6-423e-85b7-36fdc5c513e7
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  serviceName: jobmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:18:06Z"
        field.cattle.io/ports: '[[{"containerPort":8081,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm81","protocol":"TCP","sourcePort":31120},{"containerPort":6123,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm23","protocol":"TCP"},{"containerPort":6124,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm24","protocol":"TCP"}]]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
    spec:
      containers:
      - args:
        - jobmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: jobmanager
        ports:
        - containerPort: 8081
          name: jm81
          protocol: TCP
        - containerPort: 6123
          name: jm23
          protocol: TCP
        - containerPort: 6124
          name: jm24
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities: {}
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsConfig: {}
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: jobmanager-6ddbf8767b
  observedGeneration: 21
  readyReplicas: 1
  replicas: 1
  updateRevision: jobmanager-6ddbf8767b
  updatedReplicas: 1

TaskManager

需要开放的端口如下:

完整Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31538,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30067,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:30Z"
  generation: 10
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
              f:field.cattle.io/publicEndpoints: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"taskmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6121,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6122,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:40Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:58Z"
  name: taskmanager
  namespace: flink-server
  resourceVersion: "126393537"
  uid: 9ddfa3b3-b497-43af-9bb6-af83ffa0aed3
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  serviceName: taskmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:19:53Z"
        field.cattle.io/ports: '[[{"containerPort":6121,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm21","protocol":"TCP"},{"containerPort":6122,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm22","protocol":"TCP"}]]'
        field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"allNodes":true,"port":31538,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"},{"addresses":["10.10.52.11"],"allNodes":true,"port":30067,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"}]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
    spec:
      containers:
      - args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: taskmanager
        ports:
        - containerPort: 6121
          name: tm21
          protocol: TCP
        - containerPort: 6122
          name: tm22
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: taskmanager-6fd48fdb8d
  observedGeneration: 10
  readyReplicas: 1
  replicas: 1
  updateRevision: taskmanager-6fd48fdb8d
  updatedReplicas: 1

测试效果

控制台

(根据自己的情况)

http://10.10.52.11:31120/#/overview

扩容TaskManager

当我们对 TaskManager 进行扩容的时候,这里我扩展了 5个,正在逐步扩容,完毕之后 ,我们可以对应观察到控制台中的:

HA模式

同样的操作,你需要在 K8s 集群中用如下的方案来做高可用方案:

  • 3 x JobManager
  • N x TaskManager
  • Zookeeper
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12天前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
15天前
|
消息中间件 Java Kafka
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
47 7
|
15天前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
23 7
|
15天前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
54 4
|
15天前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
30 4
|
15天前
|
传感器 Java 物联网
Flink-09 Flink Java 3分钟上手 会话窗口 SessionWindow TimeWindow CountWindow GlobalWindow
Flink-09 Flink Java 3分钟上手 会话窗口 SessionWindow TimeWindow CountWindow GlobalWindow
23 4
|
15天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
18 1
|
13天前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
27 0
|
15天前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
21 0
|
15天前
|
消息中间件 NoSQL Kafka
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
29 0