1: 编辑镜像,本部分集成了hdfs 相关配置:
下载hadoop 依赖压缩包
mwget https://archive.apache.org/dist/hadoop/common/hadoop-2
解压hadoop 压缩包
若有需要,可以在lib 包下添加相应jar包,完成后打包成.tar.gz⽂件。
############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### FROM centos:7 # Install dependencies RUN set -ex; \ yum -y install wget; \ cd /etc/yum.repos.d/; \ mv CentOS-Base.repo CentOS-Base.repo.bak; \ wget http://mirrors.aliyun.com/repo/Centos-7.repo; \ yum clean all; \ yum makecache; \ yum -y update; \ yum install -y bzip2 gettext make autogen autoconf net-tools gcc-c++ telnet; # Prepare environment ENV FLINK_HOME=/opt/flink ENV PATH=$FLINK_HOME/bin:$PATH RUN set -ex; \ mkdir $FLINK_HOME; RUN groupadd --system --gid=9999 flink && \ useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink # Install gosu COPY ./gosu.tgz /opt/gosu/gosu.tgz RUN set -ex; \ cd /opt/gosu; \ tar -xf /opt/gosu/gosu.tgz --strip-components=1 \ && /opt/gosu/gosu.install.sh \ && rm -fr /opt/gosu # Install jemalloc-5.2.1.tar.gz COPY ./jemalloc-5.2.1.tar.gz /opt/jemolloc/jemalloc-5.2.1.tar.gz RUN set -ex; \ cd /opt/jemolloc; \ tar -xf /opt/jemolloc/jemalloc-5.2.1.tar.gz --strip-components=1 \ && ./configure --prefix=/usr/lib/x86_64-linux-gnu \ && make \ && make install # Install jdk-8 RUN set -ex; \ mkdir /usr/java; ENV JAVA_HOME=/usr/java/ ENV PATH=$JAVA_HOME/bin:$PATH COPY ./jdk1.8.0_181-cloudera.tgz /tmp/jdk.tgz RUN set -ex; \ cd /usr/java; \ tar -xf /tmp/jdk.tgz --strip-components=1; \ rm /tmp/jdk.tgz; \ \ chown -R flink:flink .; # Install Flink WORKDIR $FLINK_HOME COPY ./flink-1.13.1_dz.tar.gz flink.tgz // 此处对应的flink-1.13.1_dz.tar.gz 需要安装包名字⼀致 RUN set -ex; \ tar -xf flink.tgz --strip-components=1; \ rm flink.tgz; \ \ chown -R flink:flink .; # Prepare environment ENV HADOOP_HOME=/opt/hadoop ENV PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH COPY hadoop-2.8.3.tar.gz hadoop-2.8.3.tgz RUN set -ex; \ tar -xf hadoop-2.8.3.tgz --strip-components=1; \ rm hadoop-2.8.3.tgz; \ chown -R flink:flink .; # add hadoop conf or jar COPY flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib COPY hadoop-client-2.8.3.jar /opt/flink/lib COPY hadoop-common-2.8.3.jar /opt/flink/lib COPY hadoop-hdfs-2.8.3.jar /opt/flink/lib COPY hadoop-mapreduce-client-common-2.8.3.jar /opt/flink/lib COPY hadoop-mapreduce-client-core-2.8.3.jar /opt/flink/lib COPY hadoop-mapreduce-client-jobclient-2.8.3.jar /opt/flink/lib # Configure container COPY docker-entrypoint.sh / #ENTRYPOINT ["/docker-entrypoint.sh"] #CMD /docker-entrypoint.sh jobmanager EXPOSE 6123 8081
docker build -t 镜像名:版本号 . 如:flink_k8s:v1.13 . docker iamges (查看镜像) docker run -it flink_k8s:v1.13 (运⾏⼀个容器) docker exec -u 0 -it 容器名 /bin/bash 以root权限 ...
docker login 仓库地址 登陆镜像仓库 docker tag 镜像名:版本号 仓库地址/namespace/镜像名:版本号 打tag docker push 镜像地址/namespace/镜像名:版本号 推送镜像 docker pull 仓库地址/namespace/镜像名:版本号 (拉取镜像) docker logout 镜像地址 登出镜像仓库
flink on k8s JM HA yaml配置
1:对于 JobManager 和 TaskManager 运⾏过程中需要的⼀些配置⽂件,如:flink-conf.yaml、hdfssite.xml、core-site.xml,log4j-console.properties....,可以通过flink-configuration-configmap.yaml⽂件将它们定义为 ConfigMap 来实现配置的传递和读取。如果使⽤默认配置,这⼀步则不需要。模版如下:
apiVersion: v1 kind: ConfigMap metadata: name: flink-config namespace: sd-bigdata labels: app: flink data: //挂载了两个配置⽂件(flink-conf.yaml,log4j-console.properties) flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 20480m taskmanager.numberOfTaskSlots: 18 parallelism.default: 1 classloader.resolve-order: parent-first # classloader.resolve-order: 当加载⽤户代码类时,Flink使⽤child-first的 # ClassLoader还是parent-first ClassLoader。可以是parent-first 或 child-first中的⼀个 # 值。(默认:child-first)---->建议使⽤parent-first。 ## ha 相关配置参数 kubernetes.cluster-id: sdai-cluster-test # 不⽀持_,*,.等符号,建议使⽤- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # hdfs ⽬录 high-availability.storageDir: hdfs://sd-cluster-03:8020/flink/recovery restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 # chekpoin 相关参数 state.backend: filesystem # hdfs checkpoint state.checkpoints.dir: hdfs://sd-cluster-03:8020/data/flink/checkpoints log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
2:jobmanager-rest-service.yaml. 可选服务,将 jobmanager rest 端⼝公开为公共 Kubernetes 节点的端⼝。
apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest namespace: sd-bigdata spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 # web ui访问端⼝ selector: app: flink component: jobmanager --- apiVersion: v1 kind: Service metadata: name: flink-jobmanager namespace: sd-bigdata spec: ports: - name: rpc port: 6123 - name: blob-server port: 6124 selector: app: flink component: jobmanager
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager namespace: sd-bigdata spec: replicas: 3 # Set the value to greater than 1 to start standby JobManagers,建议设为奇数个 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: hostAliases: - ip: "47.92.212.xx" # kafka ip 地址 hostnames: - "sd-kafka001" - "kafka001" - ip: "39.99.227.xx" hostnames: - "sd-kafka002" - "kafka002" - ip: "39.99.158.xx" hostnames: - "sd-kafka003" - "kafka003" - ip: "192.168.1.xx" # hdfs ip 地址 hostnames: - "sd-cluster-03" - ip: "192.168.1.xx" hostnames: - "sd-cluster-04" - ip: "192.168.1.xx" hostnames: - "sd-cluster-05" containers: - name: jobmanager # 容器名(角色名) # image: registry-jf.sensedeal.wiki:9443/big-data/hdfs_flink:v8 image: registry-jf.sensedeal.wiki:9443/big-data/flink_k8s:v1.40 # 镜像名 env: - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP. args: ["jobmanager", "$(POD_IP)"] # 这两⾏不能省略 # args: ["jobmanager"] command: ["/docker-entrypoint.sh"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
意:serviceAccountName 必须具有相关权限。
3:Taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端⼝以作为公共Kubernetes 节点的端⼝访问可查询状态
如果您为其创建 NodePort 服务,则可以访问 TaskManager 的可查询状态:
1. 运⾏为pod kubectl create -f taskmanager-query-state-service.yaml 创建 NodePort 服务 taskmanager 的例⼦ taskmanager-query-state-service.yaml 可以在附录中找到。
2. 运⾏ kubectl get svc flink-taskmanager-query-state 以获取 <node-port> 此服务的。然后你可以创建QueryableStateClient(, 来提交状态查询。
apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state namespace: sd-bigdata spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager namespace: sd-bigdata spec: replicas: 4 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: hostAliases: - ip: "47.92.212.xx" hostnames: - "sd-kafka001" - "kafka001" - ip: "39.99.227.xx" hostnames: - "sd-kafka002" - "kafka002" - ip: "39.99.158.xx" hostnames: - "sd-kafka003" - "kafka003" - ip: "192.168.1.xx" hostnames: - "sd-cluster-03" - ip: "192.168.1.xx" hostnames: - "sd-cluster-04" - ip: "192.168.1.xx" hostnames: - "sd-cluster-05" containers: - name: taskmanager # image: registry-jf.sensedeal.wiki:9443/big-data/hdfs_flink:v8 image: registry-jf.sensedeal.wiki:9443/big-data/flink_k8s:v1.40 args: ["taskmanager"] command: ["/docker-entrypoint.sh"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary serviceAccountName: flink-service-account //必须配置 volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
此外,您必须使⽤有权创建、编辑、删除 ConfigMap 的服务帐户启动 JobManager 和 TaskManagerpod。有关更多信息,有关更多信息,请参阅如何为 Pod 配置服务帐户。
当启⽤ High-Availability 时,Flink 将使⽤⾃⼰的 HA-services 进⾏服务发现。因此,JobManager pod应该以其 IP 地址⽽不是 Kubernetes 服务作为其 jobmanager.rpc.address .
4:serviceAccountName 需要运维配置:
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: flink-sa namespace: default rules: - apiGroups: - '*' resources: - '*' verbs: - "*"
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: flink-sa-default namespace: default roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: flink-sa subjects: - kind: ServiceAccount name: flink-service-account namespace: sd-bigdata
kubectl apply -f flink-configuration-configmap.yaml -n namespace kubectl apply -f jobmanager-rest-service.yaml -n namespace kubectl apply -f jobmanager-session-deployment-ha.yaml -n namespace kubectl apply -f taskmanager-query-state-service.yaml -n namespace kubectl apply -f taskmanager-session-deployment-ha.yaml -n namespace
kubectl delete -f flink-configuration-configmap.yaml -n namespace kubectl delete -f jobmanager-rest-service.yaml -n namespace kubectl delete -f jobmanager-session-deployment-ha.yaml -n namespace kubectl delete -f taskmanager-query-state-service.yaml -n namespace kubectl delete -f taskmanager-session-deployment-ha.yaml -n namespace
kubectl get pods -n namespace
kubectl get svc -n namespace
kubectl get all -n namespace