flink-kubernetes-operator api使用(支持 jdk 1.8)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink-kubernetes-operator api使用(支持 jdk 1.8)

01 引言

前面写过有关flink-kubernetes-operator的相关文章,有兴趣的童鞋可以参考:

使用helm部署flink-kubernetes-operatork8s集群之后,接着项目引用其依赖就可以使用其提供的api去部署,依赖如下:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-kubernetes-operator -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-kubernetes-operator</artifactId>
    <version>版本号</version>
</dependency>

02 存在的问题

flink-kubernetes-operator源码地址: https://github.com/apache/flink-kubernetes-operator

但是,我们会发现一个问题,就是flink-kubernetes-operator最低支持jdk11版本,而我们的项目大部分都是jdk1.8,这确实有点坑。解决方案有两种:

  • 项目升级到jdk11或以上
  • 下载源码来修改

我都没使用以上的两种方法,而是根据实际需求,把关键的代码copy到本地,精简后的代码结构大致如下(需要代码的可以私聊我):
在这里插入图片描述

03 如何使用

到这里,我们比较关心的是如何调用里面的api。

其实简单看过其源码之后,会发现其最终是引用了farbric8开发的kubernetes-client来调用k8s提供的rest-api来对k8s集群的资源(resources)进行CRUD。

所以,第一步是引入kubernetes-client的依赖:

<dependency>
    <groupId>io.fabric8</groupId>
    <artifactId>kubernetes-client</artifactId>
    <version>6.3.1</version>
</dependency>

接着写K8sClient客户端工具类(当然也可以直接使用其API,仅仅是为了简化操作):

import cn.hutool.core.io.FileUtil;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Hashtable;
import java.util.Map;

/**
 * K8s 客户端
 *
 * @author : YangLinWei
 * @createTime: 2023/2/15 11:17
 * @version: 1.0.0
 */
public class K8sClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(K8sClient.class);

    private static K8sClient k8sClient;

    /**
     * cache k8s client
     * key-> kube config file path
     */
    private static Map<String, KubernetesClient> clientMap = new Hashtable<>();


    public static K8sClient INSTANCE() {
        if (k8sClient == null) {
            synchronized (K8sClient.class) {
                if (k8sClient == null) {
                    k8sClient = new K8sClient();
                }
            }
        }
        return k8sClient;
    }

    /**
     * get client
     *
     * @param configFilePath kube config filepath
     * @return kubernetes client
     */
    public KubernetesClient getClient(String configFilePath) {
        return getClient(configFilePath, false);
    }

    /**
     * get client
     *
     * @param configFilePath kube config filepath
     * @param update         new client if need updated
     * @return kubernetes client
     */
    private KubernetesClient getClient(String configFilePath, boolean update) {
        if (configFilePath == null || "".equals(configFilePath)) {
            return null;
        }
        if (update && clientMap.containsKey(configFilePath)) {
            clientMap.get(configFilePath).close();
            clientMap.remove(configFilePath);
        }

        if (clientMap.containsKey(configFilePath)) {
            return clientMap.get(configFilePath);
        }
        try {
            Config config = Config.fromKubeconfig(FileUtil.readUtf8String(configFilePath));
            DefaultKubernetesClient client = new DefaultKubernetesClient(config);
            clientMap.put(configFilePath, client);
            return client;
        } catch (Exception e) {
            LOGGER.error("Fail to get k8s ApiClient", e);
            throw new RuntimeException("Fail to get k8s ApiClient", e);
        }
    }

}

最后,我们看看单元测试工具类:

/**
 * Flink-Kubernetes-Operator Test.
 *
 * @author : YangLinWei
 * @createTime: 2023/2/16 09:27
 * @version: 1.0.0
 */
public class FlinkK8sOperatorTest {

    public static void main(String[] args) {

        // 1. 初始化客户端
        KubernetesClient client = K8sClient.INSTANCE().getClient("你的k8s配置文件路径");

        // 2. 定义flinkdeployment资源文件
        FlinkDeployment flinkDeployment = new FlinkDeployment();
        flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
        flinkDeployment.setKind("FlinkDeployment");
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setNamespace("default");
        objectMeta.setName("basic");
        flinkDeployment.setMetadata(objectMeta);
        FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec();
        flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_15);
        flinkDeploymentSpec.setImage("flink:1.15");
        Map<String, String> flinkConfiguration = new HashMap<>();
        flinkConfiguration.put("taskmanager.numberOfTaskSlots", "1");
        flinkDeploymentSpec.setFlinkConfiguration(flinkConfiguration);
        flinkDeployment.setSpec(flinkDeploymentSpec);
        flinkDeploymentSpec.setServiceAccount("flink");
        JobManagerSpec jobManagerSpec = new JobManagerSpec();
        jobManagerSpec.setResource(new Resource(1.0, "1024m"));
        flinkDeploymentSpec.setJobManager(jobManagerSpec);
        TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
        taskManagerSpec.setResource(new Resource(1.0, "1024m"));
        flinkDeploymentSpec.setTaskManager(taskManagerSpec);
        flinkDeployment
                .getSpec()
                .setJob(
                        JobSpec.builder()
                                .jarURI(
                                        "local:///opt/flink/examples/streaming/StateMachineExample.jar")
                                .parallelism(1)
                                .upgradeMode(UpgradeMode.STATELESS)
                                .build());

        // 3. 部署flinkdeployment资源文件至k8s集群
        client.resource(flinkDeployment).createOrReplace();

        // 4. 删除已部署的flinkdeployment
        //client.resource(flinkDeployment).delete();

    }
}

这样,就可以看到flink作业已经部署了:
在这里插入图片描述
也可以看到起flink web
在这里插入图片描述
当然,这里面用到了k8s一些相关的知识,继续讲讲。

04 k8s相关命令

获取集群信息命令:

kubectl cluster-info

在这里插入图片描述


导出k8s集群配置命令:

kubectl config view --raw > 本地存储配置文件的路径

获取pod和svc资源:

kubectl get pods -A|grep 你想要查询的pod
kubectl get svc -A|grep 你想要查询的service

暴露服务:

kubectl edit svc 你的service -n 命名空间

然后修改里面的typeNodePort即可。

ok,这里不再多说了,有兴趣的童鞋可以参考我的k8s专栏:https://blog.csdn.net/qq_20042935/category_9220572.html
在这里插入图片描述

05 文末

本文主要讲解了flink-kubernetes-operator api使用,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
12天前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
321 12
Flink CDC YAML:面向数据集成的 API 设计
|
4月前
|
Kubernetes 安全 Cloud Native
云上攻防-云原生篇&K8s安全-Kubelet未授权访问、API Server未授权访问
本文介绍了云原生环境下Kubernetes集群的安全问题及攻击方法。首先概述了云环境下的新型攻击路径,如通过虚拟机攻击云管理平台、容器逃逸控制宿主机等。接着详细解释了Kubernetes集群架构,并列举了常见组件的默认端口及其安全隐患。文章通过具体案例演示了API Server 8080和6443端口未授权访问的攻击过程,以及Kubelet 10250端口未授权访问的利用方法,展示了如何通过这些漏洞实现权限提升和横向渗透。
369 0
云上攻防-云原生篇&K8s安全-Kubelet未授权访问、API Server未授权访问
|
4月前
|
存储 安全 Java
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
本文介绍了JDK 21中引入的外部函数和内存API(MemorySegment),这些API使得Java程序能够更安全、高效地与JVM外部的代码和数据进行互操作,包括调用外部函数、访问外部内存,以及使用不同的Arena竞技场来分配和管理MemorySegment。
121 1
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
|
5月前
|
监控 Java 大数据
【Java内存管理新突破】JDK 22:细粒度内存管理API,精准控制每一块内存!
【9月更文挑战第9天】虽然目前JDK 22的确切内容尚未公布,但我们可以根据Java语言的发展趋势和社区的需求,预测细粒度内存管理API可能成为未来Java内存管理领域的新突破。这套API将为开发者提供前所未有的内存控制能力,助力Java应用在更多领域发挥更大作用。我们期待JDK 22的发布,期待Java语言在内存管理领域的持续创新和发展。
|
5月前
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
119 11
|
5月前
|
安全 Java API
【本地与Java无缝对接】JDK 22外部函数和内存API:JNI终结者,性能与安全双提升!
【9月更文挑战第6天】JDK 22的外部函数和内存API无疑是Java编程语言发展史上的一个重要里程碑。它不仅解决了JNI的诸多局限和挑战,还为Java与本地代码的互操作提供了更加高效、安全和简洁的解决方案。随着FFM API的逐渐成熟和完善,我们有理由相信,Java将在更多领域展现出其强大的生命力和竞争力。让我们共同期待Java编程新纪元的到来!
201 11
|
6月前
|
Kubernetes 负载均衡 API
在K8S中,api-service 和 kube-schedule 高可用原理是什么?
在K8S中,api-service 和 kube-schedule 高可用原理是什么?
|
6月前
|
资源调度 Kubernetes API
在K8S中,能否实现不通过api-Server创建Pod?
在K8S中,能否实现不通过api-Server创建Pod?
|
6月前
|
Kubernetes 监控 API
在k8S中,各模块如何与API Server进行通信的?
在k8S中,各模块如何与API Server进行通信的?
|
6月前
|
存储 Kubernetes 负载均衡
在K8S中,api-server究竟是如何实现高可用?
在K8S中,api-server究竟是如何实现高可用?

热门文章

最新文章