Flink 1.10 Native Kubernetes 原理与实践

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 在 1.10 版本完成了 Active Kubernetes Integration 的第一阶段,支持了 session clusters。后续的第二阶段会提供更完整的支持,如支持 per-job 任务提交,以及基于原生 Kubernetes API 的高可用,支持更多的 Kubernetes 参数如 toleration, label 和 node selector 等。

作者:周凯波(宝牛)

千呼万唤始出来,在 Kubernetes 如火如荼的今天,Flink 社区终于在 1.10 版本提供了对 Kubernetes 的原生支持,也就是 Native Kubernetes Integration。不过还只是Beta版本,预计会在 1.11 版本里面提供完整的支持。

我们知道,在 Flink 1.9 以及之前的版本里面,如果要在 Kubernetes 上运行 Flink 任务是需要事先指定好需要的 TaskManager(TM) 的个数以及CPU和内存的。这样的问题是:大多数情况下,你在任务启动前根本无法精确的预估这个任务需要多少个TM。如果指定的TM多了,会导致资源浪费;如果指定的TM个数少了,会导致任务调度不起来。本质原因是在 Kubernetes 上运行的 Flink 任务并没有直接向 Kubernetes 集群去申请资源。

Flink 在 1.10 版本完成了Active Kubernetes Integration的第一阶段,支持了 session clusters。后续的第二阶段会提供更完整的支持,如支持 per-job 任务提交,以及基于原生 Kubernetes API 的高可用,支持更多的 Kubernetes 参数如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味着 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申请新的 Pod,类似于 Flink 中对 Yarn 和 Mesos 的集成所做的那样。在多租户环境中,用户可以利用 Kubernetes 里面的 namespace 做资源隔离启动不同的 Flink 集群。当然,Kubernetes 集群中的用户帐号和赋权是需要提前准备好的。

原理

flink_1.10_nativek8s.png

工作原理如下(段首的序号对应图中箭头所示的数字):

  1. Flink 客户端首先连接 Kubernetes API Server,提交 Flink 集群的资源描述文件,包括 configmap,job manager service,job manager deployment 和 Owner Reference
  2. Kubernetes Master 就会根据这些资源描述文件去创建对应的 Kubernetes 实体。以我们最关心的 job manager deployment 为例,Kubernetes 集群中的某个节点收到请求后,Kubelet 进程会从中央仓库下载 Flink 镜像,准备和挂载 volume,然后执行启动命令。在 flink master 的 pod 启动后,Dispacher 和 KubernetesResourceManager 也都启动了。

前面两步完成后,整个 Flink session cluster 就启动好了,可以接受提交任务请求了。

  1. 用户可以通过 Flink 命令行即 flink client 往这个 session cluster 提交任务。此时 job graph 会在 flink client 端生成,然后和用户 jar 包一起通过 RestClinet 上传。
  2. 一旦 job 提交成功,JobSubmitHandler 收到请求就会提交 job 给 Dispatcher。接着就会生成一个 job master。
  3. JobMaster 向 KubernetesResourceManager 请求 slots。
  4. KubernetesResourceManager 从 Kubernetes 集群分配 TaskManager。每个TaskManager都是具有唯一表示的 Pod。KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件,里面有 Flink Master 的 service name 作为地址。这样在 Flink Master failover之后,TaskManager 仍然可以重新连上。
  5. Kubernetes 集群分配一个新的 Pod 后,在上面启动 TaskManager。
  6. TaskManager 启动后注册到 SlotManager。
  7. SlotManager 向 TaskManager 请求 slots。
  8. TaskManager 提供 slots 给 JobMaster。然后任务就会被分配到这个 slots 上运行。

实践

Flink 的文档上对如何使用已经写的比较详细了,不过刚开始总会踩到一些坑。如果对 Kubernetes 不熟,可能会花点时间。

(1) 首先得有个 Kubernetes 集群,会有个 ~/.kube/config 文件。尝试执行 kubectl get nodes 看下集群是否正常。

如果没有这个 ~/.kube/config 文件,会报错:

2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known

(2) 提前创建好用户和赋权(RBAC)

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

如果没有创建用户,使用默认的用户去提交,会报错:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. 

Message: Forbidden!Configured service account doesn't have access. 
Service account may have been revoked. pods is forbidden: 
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".

(3) 这一步是可选的。默认情况下, JobManager 和 TaskManager 只会将 log 写到各自 pod 的 /opt/flink/log 。如果想通过 kubectl logs 看到日志,需要将 log 输出到控制台。要做如下修改 FLINK_HOME/conf 目录下的 log4j.properties 文件。

log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

然后启动 session cluster 的命令行需要带上参数:

-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

(4) 终于可以开始启动 session cluster了。如下命令是启动一个每个 TaskManager 是4G内存,2个CPU,4个slot 的 session cluster。

bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4

更多的参数详见文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用 kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f 就能看到日志了。

如果出现大量的这种日志(目前遇到是云厂商的LoadBalance liveness探测导致):

2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exception
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
    at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)

可以暂时在 log4j.properties 里面配置上:

log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file

这个日志太多会导致 WebUI 上打开 jobmanger log 是空白,因为文件太大了前端无法显示。

如果前面第(1)和第(2)步没有做,会出现各种异常,通过 kubectl logs 就能很方便的看到日志了。

Session cluster 启动后可以通过 kubectl get pods,svc 来看是否正常。

通过端口转发来查看 Web UI:

kubectl port-forward service/kaibo-test 8081

打开 http://127.0.0.1:8001 就能看到 Flink 的 WebUI 了。

(5) 提交任务

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar

我们从 Flink WebUI 页面上可以看到,刚开始启动时,UI上显示 Total/Available Task Slots 为0, Task Managers 也是0。随着任务的提交,资源会动态增加。任务停止后,资源就会释放掉。

在提交任务后,通过 kubectl get pods 能够看到 Flink 为 TaskManager 分配了新的 Pod。

pods.png

(6) 停止 session cluster

echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true

也可以手工删除资源:

kubectl delete service/<ClusterID>

总结

可以看到,Flink 1.10 版本对和 Kubernetes 的集成做了很好的尝试。期待社区后续的 1.11 版本能对 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基于原生 Kubernetes API 的高可用。最新进展请关注 FLINK-14460

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
481 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
3月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
208 9
Flink在B站的大规模云原生实践
|
4月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
443 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
4月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
239 9
网易游戏 Flink 云原生实践
|
5月前
|
存储 负载均衡 测试技术
ACK Gateway with Inference Extension:优化多机分布式大模型推理服务实践
本文介绍了如何利用阿里云容器服务ACK推出的ACK Gateway with Inference Extension组件,在Kubernetes环境中为多机分布式部署的LLM推理服务提供智能路由和负载均衡能力。文章以部署和优化QwQ-32B模型为例,详细展示了从环境准备到性能测试的完整实践过程。
|
6月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
12月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
10月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3170 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
10月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
413 56

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多