准备配置文件
crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: name: flinkapplications.flink.k8s.io spec: group: flink.k8s.io names: kind: FlinkApplication listKind: FlinkApplicationList plural: flinkapplications singular: flinkapplication shortNames: - flinkapp scope: Namespaced version: v1beta1 versions: - name: v1beta1 served: true storage: true - name: v1alpha1 served: true storage: false validation: # openAPIV3Schema is the schema for validating custom objects. openAPIV3Schema: type: object properties: spec: type: object properties: image: type: string imagePullPolicy: type: string enum: [Always, Never, IfNotPresent] imagePullSecrets: type: array items: type: object properties: name: type: string serviceAccountName: type: string securityContext: type: object properties: fsGroup: type: integer minimum: 1 maximum: 65535 runAsGroup: type: integer minimum: 1 maximum: 65535 runAsNonRoot: type: boolean runAsUser: type: integer minimum: 1 maximum: 65535 supplementalGroups: type: array items: type: integer minimum: 1 maximum: 65535 jarName: type: string programArgs: type: string entryClass: type: string flinkVersion: type: string restartNonce: type: string parallelism: type: integer minimum: 1 deleteMode: type: string enum: [Savepoint, None, ForceCancel] allowNonRestoredState: type: boolean deploymentMode: type: string enum: [Dual, BlueGreen] rpcPort: type: integer minimum: 1 maximum: 65535 blobPort: type: integer minimum: 1 maximum: 65535 queryPort: type: integer minimum: 1 maximum: 65535 metricsQueryPort: type: integer minimum: 1 maximum: 65535 flinkConfig: type: object properties: additionalProperties: type: string savepointInfo: type: object properties: savepointLocation: type: string savepointPath: type: string savepointDisabled: type: boolean maxCheckpointRestoreAgeSeconds: type: integer minimum: 1 jobManagerConfig: type: object properties: replicas: type: integer minimum: 1 offHeapMemoryFraction: type: number minimum: 0 maximum: 1 nodeSelector: type: object properties: additionalProperties: type: string tolerations: type: array items: type: object properties: effect: type: string key: type: string operator: type: string tolerationSeconds: type: integer format: int64 value: type: string envConfig: type: object properties: env: items: properties: name: type: string value: type: string valueFrom: properties: configMapKeyRef: properties: key: type: string name: type: string optional: type: boolean required: - key type: object fieldRef: properties: apiVersion: type: string fieldPath: type: string required: - fieldPath type: object resourceFieldRef: properties: containerName: type: string divisor: {} resource: type: string required: - resource type: object secretKeyRef: properties: key: type: string name: type: string optional: type: boolean required: - key type: object type: object required: - name type: object type: array envFrom: items: properties: configMapRef: properties: name: type: string optional: type: boolean type: object prefix: type: string secretRef: properties: name: type: string optional: type: boolean type: object type: object type: array resources: type: object properties: requests: type: object properties: memory: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ cpu: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ ephemeral-storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ limits: type: object properties: memory: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ cpu: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ ephemeral-storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ taskManagerConfig: type: object properties: taskSlots: type: integer minimum: 1 offHeapMemoryFraction: type: number minimum: 0 maximum: 1 nodeSelector: type: object properties: additionalProperties: type: string tolerations: type: array items: type: object properties: effect: type: string key: type: string operator: type: string tolerationSeconds: type: integer format: int64 value: type: string envConfig: type: object properties: env: items: properties: name: type: string value: type: string valueFrom: properties: configMapKeyRef: properties: key: type: string name: type: string optional: type: boolean required: - key type: object fieldRef: properties: apiVersion: type: string fieldPath: type: string required: - fieldPath type: object resourceFieldRef: properties: containerName: type: string divisor: {} resource: type: string required: - resource type: object secretKeyRef: properties: key: type: string name: type: string optional: type: boolean required: - key type: object type: object required: - name type: object type: array envFrom: items: properties: configMapRef: properties: name: type: string optional: type: boolean type: object prefix: type: string secretRef: properties: name: type: string optional: type: boolean type: object type: object type: array resources: type: object properties: requests: type: object properties: memory: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ cpu: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ ephemeral-storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ limits: type: object properties: memory: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ cpu: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ ephemeral-storage: type: string pattern: ^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$ volumes: type: array items: type: object properties: name: type: string required: - name volumeMounts: type: array items: type: object properties: mountPath: type: string mountPropagation: type: string name: type: string readOnly: type: boolean subPath: type: string subPathExpr: type: string required: - name - mountPath required: - image - jarName - parallelism - entryClass subresources: status: {} additionalPrinterColumns: - name: Phase type: string description: The current state machine phase for this FlinkApplication JSONPath: .status.phase - name: Cluster Health type: string description: The health of the Flink cluster JSONPath: .status.clusterStatus.health - name: Job Health type: string description: The health of the Flink job JSONPath: .status.jobStatus.health - name: Healthy TMs type: string JSONPath: ".status.clusterStatus.healthyTaskManagers" priority: 1 - name: Total TMs type: string JSONPath: ".status.clusterStatus.numberOfTaskManagers" priority: 1 - name: Job Restarts type: integer description: Number of times the job has restarted JSONPath: .status.jobStatus.jobRestartCount - name: Age type: date JSONPath: .metadata.creationTimestamp
namespace.yaml
apiVersion: v1 kind: Namespace metadata: name: flink-operator
role.yaml
# Create a ClusterRole for flinkk8soperator # https://kubernetes.io/docs/admin/authorization/rbac/ kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: flinkoperator rules: - apiGroups: - "" resources: - pods verbs: - get - list - watch - apiGroups: - "" resources: - services verbs: - create - get - list - watch - update - delete - apiGroups: - extensions - apps resources: - deployments - deployments/status - ingresses - ingresses/status verbs: - get - list - watch - create - update - delete # Allow Event recording access - apiGroups: - "" resources: - events verbs: - create - update - patch # Allow Access to CRD - apiGroups: - apiextensions.k8s.io resources: - customresourcedefinitions verbs: - get - list - watch - create - update # Allow Access to flink applications under flink.k8s.io - apiGroups: - flink.k8s.io resources: - flinkapplications - flinkapplications/status - flinkapplications/finalizers verbs: - get - list - watch - create - update - delete - patch --- #Create a Service Account for flinkk8soperator apiVersion: v1 kind: ServiceAccount metadata: name: flinkoperator namespace: flink-operator
role-binding.yaml
# Create a binding from Role -> ServiceAccount kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 metadata: name: flinkoperator roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: flinkoperator subjects: - kind: ServiceAccount name: flinkoperator namespace: flink-operator
config.yaml
kind: ConfigMap apiVersion: v1 metadata: name: flink-operator-config namespace: flink-operator data: # this will need to be templatized config: |- operator: #替换{ingress_suffix}为你的集群ingress url,如不设置则无ingress ingressUrlFormat: "{{$jobCluster}}.{ingress_suffix}" logger: level: 4
flinkk8soperator.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flinkoperator namespace: flink-operator labels: app: flinkoperator spec: replicas: 1 selector: matchLabels: app: flinkoperator template: metadata: labels: app: flinkoperator app.kubernetes.io/version: 0.5.0 spec: serviceAccountName: flinkoperator volumes: - name: config-volume configMap: name: flink-operator-config items: - key: config path: config.yaml containers: - name: flinkoperator-gojson image: docker.io/lyft/flinkk8soperator:v0.5.0 command: - flinkoperator args: - --logtostderr - --config - /etc/flinkoperator/config*/config.yaml env: - name: OPERATOR_NAME value: flinkk8soperator imagePullPolicy: IfNotPresent ports: - containerPort: 10254 resources: requests: memory: "4Gi" cpu: "4" limits: memory: "8G" cpu: "8" volumeMounts: - name: config-volume mountPath: /etc/flinkoperator/config
flink-operator-custom-resource.yaml
apiVersion: flink.k8s.io/v1beta1 kind: FlinkApplication metadata: name: wordcount-operator-example namespace: flink-operator annotations: labels: environment: development spec: image: docker.io/lyft/wordcount-operator-example:{sha} deleteMode: None flinkConfig: taskmanager.heap.size: 200 taskmanager.network.memory.fraction: 0.1 taskmanager.network.memory.min: 10m state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints state.savepoints.dir: file:///checkpoints/flink/savepoints web.upload.dir: /opt/flink jobManagerConfig: resources: requests: memory: "200Mi" cpu: "0.1" replicas: 1 taskManagerConfig: taskSlots: 2 resources: requests: memory: "200Mi" cpu: "0.1" flinkVersion: "1.8" jarName: "wordcount-operator-example-1.0.0-SNAPSHOT.jar" parallelism: 3 entryClass: "org.apache.flink.WordCount"
依次执行yaml文件
kubectl create -f crd.yaml kubectl create -f namespace.yaml kubectl create -f role.yaml kubectl create -f role-binding.yaml kubectl create -f config.yaml kubectl create -f flinkk8soperator.yaml
执行结束,即可查看相关pod
kubectl get pods -n flink-operator kubectl logs {pod-name} -n flink-operator
运行一个案例
kubectl create -f flink-operator-custom-resource.yaml
官网部署方式
前置要求:
- docker
- kubernetes
- helm
1:安装证书管理器
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
如果证书管理器安装因任何原因失败,您可以通过向操作员的helm install命令传递--set webhook.create=false来禁用webhook。
2: helm下载
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator # helm upgrade --install flink-kubernetes-operator . -n flink-cluster-test -f # ./ci/values.yaml --set webhook.create=false
Helm图表指向ghcr.io/apache/flink-kubernetes-operator图像存储库。
如果您有连接问题或更喜欢使用Dockerhub,
则可以在安装过程中使用--set image.resource=apache/flink-kubernetes运算符。
3:kubectl和helm验证安装是否成功
kubectl get pods NAME READY STATUS RESTARTS AGE flink-kubernetes-operator-fb5d46f94-ghd8b 2/2 Running 0 4m21s helm list NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 +0100 CET deployed flink-kubernetes-operator-1.4-SNAPSHOT 1.4-SNAPSHOT
4: 提交flink任务
kubectl create -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.3/examples/basic.yaml
日志查看
kubectl logs -f deploy/basic-example 2022-03-11 21:46:04,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 206 (type=CHECKPOINT) @ 1647035164458 for job a12c04ac7f5d8418d8ab27931bf517b7. 2022-03-11 21:46:04,465 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 206 for job a12c04ac7f5d8418d8ab27931bf517b7 (28509 bytes, checkpointDuration=7 ms, finalizationTime=0 ms). 2022-03-11 21:46:06,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 207 (type=CHECKPOINT) @ 1647035166458 for job a12c04ac7f5d8418d8ab27931bf517b7. 2022-03-11 21:46:06,483 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 207 for job a12c04ac7f5d8418d8ab27931bf517b7 (28725 bytes, checkpointDuration=25 ms, finalizationTime=0 ms).
添加端口转发规则或查看入口配置选项:
kubectl port-forward svc/basic-example-rest 8081
5: 停止并删除任务
kubectl delete flinkdeployment/basic-example