flink-kubernetes-operator api使用(支持 jdk 1.8)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: flink-kubernetes-operator api使用(支持 jdk 1.8)

01 引言

前面写过有关flink-kubernetes-operator的相关文章,有兴趣的童鞋可以参考:

使用helm部署flink-kubernetes-operatork8s集群之后,接着项目引用其依赖就可以使用其提供的api去部署,依赖如下:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-kubernetes-operator -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-kubernetes-operator</artifactId>
    <version>版本号</version>
</dependency>

02 存在的问题

flink-kubernetes-operator源码地址: https://github.com/apache/flink-kubernetes-operator

但是,我们会发现一个问题,就是flink-kubernetes-operator最低支持jdk11版本,而我们的项目大部分都是jdk1.8,这确实有点坑。解决方案有两种:

  • 项目升级到jdk11或以上
  • 下载源码来修改

我都没使用以上的两种方法,而是根据实际需求,把关键的代码copy到本地,精简后的代码结构大致如下(需要代码的可以私聊我):
在这里插入图片描述

03 如何使用

到这里,我们比较关心的是如何调用里面的api。

其实简单看过其源码之后,会发现其最终是引用了farbric8开发的kubernetes-client来调用k8s提供的rest-api来对k8s集群的资源(resources)进行CRUD。

所以,第一步是引入kubernetes-client的依赖:

<dependency>
    <groupId>io.fabric8</groupId>
    <artifactId>kubernetes-client</artifactId>
    <version>6.3.1</version>
</dependency>

接着写K8sClient客户端工具类(当然也可以直接使用其API,仅仅是为了简化操作):

import cn.hutool.core.io.FileUtil;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Hashtable;
import java.util.Map;

/**
 * K8s 客户端
 *
 * @author : YangLinWei
 * @createTime: 2023/2/15 11:17
 * @version: 1.0.0
 */
public class K8sClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(K8sClient.class);

    private static K8sClient k8sClient;

    /**
     * cache k8s client
     * key-> kube config file path
     */
    private static Map<String, KubernetesClient> clientMap = new Hashtable<>();


    public static K8sClient INSTANCE() {
        if (k8sClient == null) {
            synchronized (K8sClient.class) {
                if (k8sClient == null) {
                    k8sClient = new K8sClient();
                }
            }
        }
        return k8sClient;
    }

    /**
     * get client
     *
     * @param configFilePath kube config filepath
     * @return kubernetes client
     */
    public KubernetesClient getClient(String configFilePath) {
        return getClient(configFilePath, false);
    }

    /**
     * get client
     *
     * @param configFilePath kube config filepath
     * @param update         new client if need updated
     * @return kubernetes client
     */
    private KubernetesClient getClient(String configFilePath, boolean update) {
        if (configFilePath == null || "".equals(configFilePath)) {
            return null;
        }
        if (update && clientMap.containsKey(configFilePath)) {
            clientMap.get(configFilePath).close();
            clientMap.remove(configFilePath);
        }

        if (clientMap.containsKey(configFilePath)) {
            return clientMap.get(configFilePath);
        }
        try {
            Config config = Config.fromKubeconfig(FileUtil.readUtf8String(configFilePath));
            DefaultKubernetesClient client = new DefaultKubernetesClient(config);
            clientMap.put(configFilePath, client);
            return client;
        } catch (Exception e) {
            LOGGER.error("Fail to get k8s ApiClient", e);
            throw new RuntimeException("Fail to get k8s ApiClient", e);
        }
    }

}

最后,我们看看单元测试工具类:

/**
 * Flink-Kubernetes-Operator Test.
 *
 * @author : YangLinWei
 * @createTime: 2023/2/16 09:27
 * @version: 1.0.0
 */
public class FlinkK8sOperatorTest {

    public static void main(String[] args) {

        // 1. 初始化客户端
        KubernetesClient client = K8sClient.INSTANCE().getClient("你的k8s配置文件路径");

        // 2. 定义flinkdeployment资源文件
        FlinkDeployment flinkDeployment = new FlinkDeployment();
        flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
        flinkDeployment.setKind("FlinkDeployment");
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setNamespace("default");
        objectMeta.setName("basic");
        flinkDeployment.setMetadata(objectMeta);
        FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec();
        flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_15);
        flinkDeploymentSpec.setImage("flink:1.15");
        Map<String, String> flinkConfiguration = new HashMap<>();
        flinkConfiguration.put("taskmanager.numberOfTaskSlots", "1");
        flinkDeploymentSpec.setFlinkConfiguration(flinkConfiguration);
        flinkDeployment.setSpec(flinkDeploymentSpec);
        flinkDeploymentSpec.setServiceAccount("flink");
        JobManagerSpec jobManagerSpec = new JobManagerSpec();
        jobManagerSpec.setResource(new Resource(1.0, "1024m"));
        flinkDeploymentSpec.setJobManager(jobManagerSpec);
        TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
        taskManagerSpec.setResource(new Resource(1.0, "1024m"));
        flinkDeploymentSpec.setTaskManager(taskManagerSpec);
        flinkDeployment
                .getSpec()
                .setJob(
                        JobSpec.builder()
                                .jarURI(
                                        "local:///opt/flink/examples/streaming/StateMachineExample.jar")
                                .parallelism(1)
                                .upgradeMode(UpgradeMode.STATELESS)
                                .build());

        // 3. 部署flinkdeployment资源文件至k8s集群
        client.resource(flinkDeployment).createOrReplace();

        // 4. 删除已部署的flinkdeployment
        //client.resource(flinkDeployment).delete();

    }
}

这样,就可以看到flink作业已经部署了:
在这里插入图片描述
也可以看到起flink web
在这里插入图片描述
当然,这里面用到了k8s一些相关的知识,继续讲讲。

04 k8s相关命令

获取集群信息命令:

kubectl cluster-info

在这里插入图片描述


导出k8s集群配置命令:

kubectl config view --raw > 本地存储配置文件的路径

获取pod和svc资源:

kubectl get pods -A|grep 你想要查询的pod
kubectl get svc -A|grep 你想要查询的service

暴露服务:

kubectl edit svc 你的service -n 命名空间

然后修改里面的typeNodePort即可。

ok,这里不再多说了,有兴趣的童鞋可以参考我的k8s专栏:https://blog.csdn.net/qq_20042935/category_9220572.html
在这里插入图片描述

05 文末

本文主要讲解了flink-kubernetes-operator api使用,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
7月前
|
Kubernetes API 网络安全
当node节点kubectl 命令无法连接到 Kubernetes API 服务器
当Node节点上的 `kubectl`无法连接到Kubernetes API服务器时,可以通过以上步骤逐步排查和解决问题。首先确保网络连接正常,验证 `kubeconfig`文件配置正确,检查API服务器和Node节点的状态,最后排除防火墙或网络策略的干扰,并通过重启服务恢复正常连接。通过这些措施,可以有效解决与Kubernetes API服务器通信的常见问题,从而保障集群的正常运行。
439 17
|
8月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
641 12
Flink CDC YAML:面向数据集成的 API 设计
|
7月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
213 5
|
12月前
|
存储 安全 Java
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
本文介绍了JDK 21中引入的外部函数和内存API(MemorySegment),这些API使得Java程序能够更安全、高效地与JVM外部的代码和数据进行互操作,包括调用外部函数、访问外部内存,以及使用不同的Arena竞技场来分配和管理MemorySegment。
380 1
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
|
12月前
|
Kubernetes 安全 Cloud Native
云上攻防-云原生篇&K8s安全-Kubelet未授权访问、API Server未授权访问
本文介绍了云原生环境下Kubernetes集群的安全问题及攻击方法。首先概述了云环境下的新型攻击路径,如通过虚拟机攻击云管理平台、容器逃逸控制宿主机等。接着详细解释了Kubernetes集群架构,并列举了常见组件的默认端口及其安全隐患。文章通过具体案例演示了API Server 8080和6443端口未授权访问的攻击过程,以及Kubelet 10250端口未授权访问的利用方法,展示了如何通过这些漏洞实现权限提升和横向渗透。
946 0
云上攻防-云原生篇&K8s安全-Kubelet未授权访问、API Server未授权访问
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
250 11
|
安全 Java API
【本地与Java无缝对接】JDK 22外部函数和内存API:JNI终结者,性能与安全双提升!
【9月更文挑战第6天】JDK 22的外部函数和内存API无疑是Java编程语言发展史上的一个重要里程碑。它不仅解决了JNI的诸多局限和挑战,还为Java与本地代码的互操作提供了更加高效、安全和简洁的解决方案。随着FFM API的逐渐成熟和完善,我们有理由相信,Java将在更多领域展现出其强大的生命力和竞争力。让我们共同期待Java编程新纪元的到来!
463 11
|
监控 Java 大数据
【Java内存管理新突破】JDK 22:细粒度内存管理API,精准控制每一块内存!
【9月更文挑战第9天】虽然目前JDK 22的确切内容尚未公布,但我们可以根据Java语言的发展趋势和社区的需求,预测细粒度内存管理API可能成为未来Java内存管理领域的新突破。这套API将为开发者提供前所未有的内存控制能力,助力Java应用在更多领域发挥更大作用。我们期待JDK 22的发布,期待Java语言在内存管理领域的持续创新和发展。
|
资源调度 Kubernetes API
在K8S中,能否实现不通过api-Server创建Pod?
在K8S中,能否实现不通过api-Server创建Pod?
|
Kubernetes 负载均衡 API
在K8S中,api-service 和 kube-schedule 高可用原理是什么?
在K8S中,api-service 和 kube-schedule 高可用原理是什么?

热门文章

最新文章