flink on k8s jobmanager HA 完全部署

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: flink on k8s jobmanager HA 完全部署

1: 编辑镜像,本部分集成了hdfs 相关配置:

下载hadoop 依赖压缩包

mwget https://archive.apache.org/dist/hadoop/common/hadoop-2

解压hadoop 压缩包

解压后进⼊到etc/hadoop/⽬录下并替换掉core-site.xml,hdfs-site.xml,yarnsite.xml,mapred-site.xml⽂件。

若有需要,可以在lib 包下添加相应jar包,完成后打包成.tar.gz⽂件。

下载其它安装包,需在Dockerfile同级⽬录下:如

60a6bcefe26f4b118e50f46e4d0afd1d.png编辑Dockerfile⽂件:

###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
FROM centos:7
# Install dependencies
RUN set -ex; \
  yum -y install wget; \
  cd /etc/yum.repos.d/; \
  mv CentOS-Base.repo CentOS-Base.repo.bak; \
  wget http://mirrors.aliyun.com/repo/Centos-7.repo; \
  yum clean all; \
  yum makecache; \
  yum -y update; \
  yum install -y bzip2 gettext make autogen autoconf net-tools gcc-c++ telnet;
# Prepare environment
ENV FLINK_HOME=/opt/flink
ENV PATH=$FLINK_HOME/bin:$PATH
RUN set -ex; \
    mkdir $FLINK_HOME;
RUN groupadd --system --gid=9999 flink && \
    useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
# Install gosu
COPY ./gosu.tgz /opt/gosu/gosu.tgz
RUN set -ex; \
    cd /opt/gosu; \
    tar -xf /opt/gosu/gosu.tgz --strip-components=1 \ 
    && /opt/gosu/gosu.install.sh \
    && rm -fr /opt/gosu
# Install jemalloc-5.2.1.tar.gz
COPY ./jemalloc-5.2.1.tar.gz /opt/jemolloc/jemalloc-5.2.1.tar.gz
RUN set -ex; \
    cd /opt/jemolloc; \
    tar -xf /opt/jemolloc/jemalloc-5.2.1.tar.gz --strip-components=1 \
    && ./configure --prefix=/usr/lib/x86_64-linux-gnu \
    && make \
    && make install
# Install jdk-8
RUN set -ex; \
    mkdir /usr/java;
ENV JAVA_HOME=/usr/java/
ENV PATH=$JAVA_HOME/bin:$PATH
COPY ./jdk1.8.0_181-cloudera.tgz /tmp/jdk.tgz
RUN set -ex; \
    cd /usr/java; \
    tar -xf /tmp/jdk.tgz --strip-components=1; \
    rm /tmp/jdk.tgz; \ 
    \
    chown -R flink:flink .;
# Install Flink
WORKDIR $FLINK_HOME
COPY ./flink-1.13.1_dz.tar.gz flink.tgz // 此处对应的flink-1.13.1_dz.tar.gz 需要安装包名字⼀致
RUN set -ex; \
    tar -xf flink.tgz --strip-components=1; \
    rm flink.tgz; \ 
    \
    chown -R flink:flink .;
# Prepare environment
ENV HADOOP_HOME=/opt/hadoop
ENV PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
COPY hadoop-2.8.3.tar.gz hadoop-2.8.3.tgz
RUN set -ex; \
    tar -xf hadoop-2.8.3.tgz --strip-components=1; \
    rm hadoop-2.8.3.tgz; \
    chown -R flink:flink .;
# add hadoop conf or jar
COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib
COPY hadoop-client-2.8.3.jar /opt/flink/lib
COPY hadoop-common-2.8.3.jar /opt/flink/lib
COPY hadoop-hdfs-2.8.3.jar /opt/flink/lib
COPY hadoop-mapreduce-client-common-2.8.3.jar /opt/flink/lib
COPY hadoop-mapreduce-client-core-2.8.3.jar /opt/flink/lib
COPY hadoop-mapreduce-client-jobclient-2.8.3.jar /opt/flink/lib
# Configure container
COPY docker-entrypoint.sh /
#ENTRYPOINT ["/docker-entrypoint.sh"]
#CMD /docker-entrypoint.sh jobmanager
EXPOSE 6123 8081

打包镜像:

docker build -t 镜像名:版本号 .             如:flink_k8s:v1.13 .
docker iamges                                   (查看镜像)
docker run -it flink_k8s:v1.13                 (运⾏⼀个容器)
docker exec -u 0 -it 容器名 /bin/bash            以root权限
...

推送/拉取镜像

docker login 仓库地址                                 登陆镜像仓库
docker tag 镜像名:版本号 仓库地址/namespace/镜像名:版本号 打tag
docker push 镜像地址/namespace/镜像名:版本号            推送镜像
docker pull 仓库地址/namespace/镜像名:版本号            (拉取镜像)
docker logout 镜像地址                                登出镜像仓库

flink on k8s JM HA yaml配置

配置⽬录结构如下:

60a6bcefe26f4b118e50f46e4d0afd1d.png

1:对于 JobManager 和 TaskManager 运⾏过程中需要的⼀些配置⽂件,如:flink-conf.yaml、hdfssite.xml、core-site.xml,log4j-console.properties....,可以通过flink-configuration-configmap.yaml⽂件将它们定义为 ConfigMap 来实现配置的传递和读取。如果使⽤默认配置,这⼀步则不需要。模版如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: sd-bigdata
  labels:
    app: flink
data:       //挂载了两个配置⽂件(flink-conf.yaml,log4j-console.properties)
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 20480m
    taskmanager.numberOfTaskSlots: 18
    parallelism.default: 1
    classloader.resolve-order: parent-first
    # classloader.resolve-order: 当加载⽤户代码类时,Flink使⽤child-first的
    # ClassLoader还是parent-first ClassLoader。可以是parent-first 或 child-first中的⼀个
    # 值。(默认:child-first)---->建议使⽤parent-first。
##   ha 相关配置参数
    kubernetes.cluster-id: sdai-cluster-test     
    # 不⽀持_,*,.等符号,建议使⽤-
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    # hdfs ⽬录
    high-availability.storageDir: hdfs://sd-cluster-03:8020/flink/recovery
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
    # chekpoin 相关参数
    state.backend: filesystem
    # hdfs checkpoint
    state.checkpoints.dir: hdfs://sd-cluster-03:8020/data/flink/checkpoints
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender
    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO
    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

2:jobmanager-rest-service.yaml. 可选服务,将 jobmanager rest 端⼝公开为公共 Kubernetes 节点的端⼝。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
  namespace: sd-bigdata
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081            # web ui访问端⼝
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: sd-bigdata
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  selector:
    app: flink
    component: jobmanager

3:jobmanager-session-deployment-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: sd-bigdata
spec:
  replicas: 3 # Set the value to greater than 1 to start standby JobManagers,建议设为奇数个
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      hostAliases:
      - ip: "47.92.212.xx"     # kafka ip 地址
        hostnames:
        - "sd-kafka001"
        - "kafka001"
      - ip: "39.99.227.xx"
        hostnames:
        - "sd-kafka002"
        - "kafka002"
      - ip: "39.99.158.xx"
        hostnames:
        - "sd-kafka003"
        - "kafka003"
      - ip: "192.168.1.xx"      # hdfs ip 地址              
        hostnames:
        - "sd-cluster-03"
      - ip: "192.168.1.xx"
        hostnames:
        - "sd-cluster-04"
      - ip: "192.168.1.xx"
        hostnames:
        - "sd-cluster-05"
      containers:
      - name: jobmanager       # 容器名(角色名)
        # image:  registry-jf.sensedeal.wiki:9443/big-data/hdfs_flink:v8
        image: registry-jf.sensedeal.wiki:9443/big-data/flink_k8s:v1.40    # 镜像名
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
        args: ["jobmanager", "$(POD_IP)"]             # 这两⾏不能省略
        # args: ["jobmanager"]
        command: ["/docker-entrypoint.sh"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

意:serviceAccountName 必须具有相关权限。

3:Taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端⼝以作为公共Kubernetes 节点的端⼝访问可查询状态

如果您为其创建 NodePort 服务,则可以访问 TaskManager 的可查询状态:

   1. 运⾏为pod kubectl create -f taskmanager-query-state-service.yaml 创建 NodePort 服务 taskmanager 的例⼦ taskmanager-query-state-service.yaml 可以在附录中找到。

   2. 运⾏ kubectl get svc flink-taskmanager-query-state 以获取 <node-port> 此服务的。然后你可以创建QueryableStateClient(, 来提交状态查询。


taskmanager-query-state-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
  namespace: sd-bigdata
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager

taskmanager-session-deployment-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: sd-bigdata
spec:
  replicas: 4
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      hostAliases:
      - ip: "47.92.212.xx"
        hostnames:
        - "sd-kafka001"
        - "kafka001"
      - ip: "39.99.227.xx"
        hostnames:
        - "sd-kafka002"
        - "kafka002"
      - ip: "39.99.158.xx"
        hostnames:
        - "sd-kafka003"
        - "kafka003"
      - ip: "192.168.1.xx"
        hostnames:
        - "sd-cluster-03"
      - ip: "192.168.1.xx"
        hostnames:
        - "sd-cluster-04"
      - ip: "192.168.1.xx"
        hostnames:
        - "sd-cluster-05"
      containers:
      - name: taskmanager
        # image: registry-jf.sensedeal.wiki:9443/big-data/hdfs_flink:v8
        image: registry-jf.sensedeal.wiki:9443/big-data/flink_k8s:v1.40
        args: ["taskmanager"]
        command: ["/docker-entrypoint.sh"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account          //必须配置
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

此外,您必须使⽤有权创建、编辑、删除 ConfigMap 的服务帐户启动 JobManager 和 TaskManagerpod。有关更多信息,有关更多信息,请参阅如何为 Pod 配置服务帐户。

当启⽤ High-Availability 时,Flink 将使⽤⾃⼰的 HA-services 进⾏服务发现。因此,JobManager pod应该以其 IP 地址⽽不是 Kubernetes 服务作为其 jobmanager.rpc.address .


4:serviceAccountName 需要运维配置:

role-sd-bigdata.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: flink-sa
  namespace: default
rules:
- apiGroups:
  - '*'
  resources:
  - '*'
  verbs:
  - "*"

rolebinding-flink-sa-default.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-sa-default
  namespace: default
roleRef:
   apiGroup: rbac.authorization.k8s.io
   kind: Role
   name: flink-sa 
subjects:
- kind: ServiceAccount
  name: flink-service-account
  namespace: sd-bigdata

5:启动服务:

切换为具有执⾏权限的⽤户后执⾏以下命令:

启动

 kubectl apply -f flink-configuration-configmap.yaml -n namespace
 kubectl apply -f jobmanager-rest-service.yaml -n namespace
 kubectl apply -f jobmanager-session-deployment-ha.yaml -n namespace
 kubectl apply -f taskmanager-query-state-service.yaml -n namespace
 kubectl apply -f taskmanager-session-deployment-ha.yaml -n namespace

删除:

 kubectl delete -f flink-configuration-configmap.yaml -n namespace
 kubectl delete -f jobmanager-rest-service.yaml -n namespace
 kubectl delete -f jobmanager-session-deployment-ha.yaml -n namespace
 kubectl delete -f taskmanager-query-state-service.yaml -n namespace
 kubectl delete -f taskmanager-session-deployment-ha.yaml -n namespace

查看:

kubectl get pods -n namespace

60a6bcefe26f4b118e50f46e4d0afd1d.png

kubectl get svc -n namespace

60a6bcefe26f4b118e50f46e4d0afd1d.png

kubectl get all -n namespace

60a6bcefe26f4b118e50f46e4d0afd1d.png










相关文章
|
1月前
|
Kubernetes 持续交付 Docker
利用 Docker 和 Kubernetes 实现微服务部署
【10月更文挑战第2天】利用 Docker 和 Kubernetes 实现微服务部署
|
1月前
|
Prometheus Kubernetes 监控
k8s部署针对外部服务器的prometheus服务
通过上述步骤,您不仅成功地在Kubernetes集群内部署了Prometheus,还实现了对集群外服务器的有效监控。理解并实施网络配置是关键,确保监控数据的准确无误传输。随着监控需求的增长,您还可以进一步探索Prometheus生态中的其他组件,如Alertmanager、Grafana等,以构建完整的监控与报警体系。
120 60
|
1月前
|
Prometheus Kubernetes 监控
k8s部署针对外部服务器的prometheus服务
通过上述步骤,您不仅成功地在Kubernetes集群内部署了Prometheus,还实现了对集群外服务器的有效监控。理解并实施网络配置是关键,确保监控数据的准确无误传输。随着监控需求的增长,您还可以进一步探索Prometheus生态中的其他组件,如Alertmanager、Grafana等,以构建完整的监控与报警体系。
203 62
|
1月前
|
NoSQL 关系型数据库 Redis
高可用和性能:基于ACK部署Dify的最佳实践
本文介绍了基于阿里云容器服务ACK,部署高可用、可伸缩且具备高SLA的生产可用的Dify服务的详细解决方案。
|
1月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
73 0
|
1月前
|
Kubernetes 网络协议 安全
[kubernetes]二进制方式部署单机k8s-v1.30.5
[kubernetes]二进制方式部署单机k8s-v1.30.5
|
1月前
|
Kubernetes Docker 微服务
微服务实践k8s&dapr开发部署实验(1)服务调用(二)
微服务实践k8s&dapr开发部署实验(1)服务调用(二)
50 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
15天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
677 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。