Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

简介: Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

背景介绍

单机模式

在之前的章节中,已经验证过,但是实际运行的时候,我们是需要提交到服务器去运行的。


集群模式

在之前章节中,已经验证过,(方便测试) 使用了 Docker 的方式进行部署。同时也利用容器编排工具docker-compose的方式,进行集群的模式部署:


简单测试 1 x JobManager + 1 x TaskManager

集群模式 1 x JobManager + 3 x TaskManager

HA模式 2 x JobManager + Zookeeper + 3 x TaskManager

但是对于上述的 docker-compose 编排的方式,我们依然是在一台机器上的。对于实际的生产情况,我们可能需要面对多台机器进行部署,同时最好的方案是:动态扩容、动态缩容,最大程度的利用资源。


云原生部署

首先确保你有 Kubernetes

然后为了简化 这里借助 Rancher 可视化进行操作

等熟悉了,你可以使用 yaml尝试部署。

最后,生产环境可借助 Helm + Flink Operator 极大简化过程。

JobManager

需要开放的端口如下:

完整的Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31120,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30985,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30166,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:14Z"
  generation: 21
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"jobmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6123,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6124,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":8081,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:capabilities: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsConfig: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:03Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:45Z"
  name: jobmanager
  namespace: flink-server
  resourceVersion: "126393476"
  uid: 461079aa-69f6-423e-85b7-36fdc5c513e7
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  serviceName: jobmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:18:06Z"
        field.cattle.io/ports: '[[{"containerPort":8081,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm81","protocol":"TCP","sourcePort":31120},{"containerPort":6123,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm23","protocol":"TCP"},{"containerPort":6124,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm24","protocol":"TCP"}]]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
    spec:
      containers:
      - args:
        - jobmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: jobmanager
        ports:
        - containerPort: 8081
          name: jm81
          protocol: TCP
        - containerPort: 6123
          name: jm23
          protocol: TCP
        - containerPort: 6124
          name: jm24
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities: {}
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsConfig: {}
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: jobmanager-6ddbf8767b
  observedGeneration: 21
  readyReplicas: 1
  replicas: 1
  updateRevision: jobmanager-6ddbf8767b
  updatedReplicas: 1

TaskManager

需要开放的端口如下:

完整Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31538,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30067,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:30Z"
  generation: 10
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
              f:field.cattle.io/publicEndpoints: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"taskmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6121,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6122,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:40Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:58Z"
  name: taskmanager
  namespace: flink-server
  resourceVersion: "126393537"
  uid: 9ddfa3b3-b497-43af-9bb6-af83ffa0aed3
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  serviceName: taskmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:19:53Z"
        field.cattle.io/ports: '[[{"containerPort":6121,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm21","protocol":"TCP"},{"containerPort":6122,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm22","protocol":"TCP"}]]'
        field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"allNodes":true,"port":31538,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"},{"addresses":["10.10.52.11"],"allNodes":true,"port":30067,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"}]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
    spec:
      containers:
      - args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: taskmanager
        ports:
        - containerPort: 6121
          name: tm21
          protocol: TCP
        - containerPort: 6122
          name: tm22
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: taskmanager-6fd48fdb8d
  observedGeneration: 10
  readyReplicas: 1
  replicas: 1
  updateRevision: taskmanager-6fd48fdb8d
  updatedReplicas: 1

测试效果

控制台

(根据自己的情况)

http://10.10.52.11:31120/#/overview

扩容TaskManager

当我们对 TaskManager 进行扩容的时候,这里我扩展了 5个,正在逐步扩容,完毕之后 ,我们可以对应观察到控制台中的:

HA模式

同样的操作,你需要在 K8s 集群中用如下的方案来做高可用方案:

  • 3 x JobManager
  • N x TaskManager
  • Zookeeper
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
5月前
|
人工智能 算法 调度
阿里云ACK托管集群Pro版共享GPU调度操作指南
本文介绍在阿里云ACK托管集群Pro版中,如何通过共享GPU调度实现显存与算力的精细化分配,涵盖前提条件、使用限制、节点池配置及任务部署全流程,提升GPU资源利用率,适用于AI训练与推理场景。
473 2
|
5月前
|
弹性计算 监控 调度
ACK One 注册集群云端节点池升级:IDC 集群一键接入云端 GPU 算力,接入效率提升 80%
ACK One注册集群节点池实现“一键接入”,免去手动编写脚本与GPU驱动安装,支持自动扩缩容与多场景调度,大幅提升K8s集群管理效率。
328 89
存储 jenkins 持续交付
737 2
|
10月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
本文介绍如何利用阿里云的分布式云容器平台ACK One的多集群应用分发功能,结合云效CD能力,快速将单集群CD系统升级为多集群CD系统。通过增加分发策略(PropagationPolicy)和差异化策略(OverridePolicy),并修改单集群kubeconfig为舰队kubeconfig,可实现无损改造。该方案具备多地域多集群智能资源调度、重调度及故障迁移等能力,帮助用户提升业务效率与可靠性。
|
10月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
ACK One 的多集群应用分发,可以最小成本地结合您已有的单集群 CD 系统,无需对原先应用资源 YAML 进行修改,即可快速构建成多集群的 CD 系统,并同时获得强大的多集群资源调度和分发的能力。
559 9
|
10月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
安全 算法 Java
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
这篇文章讨论了Java集合类的线程安全性,列举了线程不安全的集合类(如HashSet、ArrayList、HashMap)和线程安全的集合类(如Vector、Hashtable),同时介绍了Java 5之后提供的java.util.concurrent包中的高效并发集合类,如ConcurrentHashMap和CopyOnWriteArrayList。
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
|
存储 算法 Java
盘点Java集合(容器)概览,Collection和Map在开发中谁用的最多?
盘点Java集合(容器)概览,Collection和Map在开发中谁用的最多?
211 0
|
安全 算法 Java
安全无忧:Java并发集合容器的应用与实践
安全无忧:Java并发集合容器的应用与实践
162 0
安全无忧:Java并发集合容器的应用与实践
|
存储 算法 安全
面霸篇:Java 核心集合容器全解(核心卷二)
面霸篇:Java 核心集合容器全解(核心卷二)
189 0

推荐镜像

更多