01 引言
前面写过有关flink-kubernetes-operator
的相关文章,有兴趣的童鞋可以参考:
- 《flink-kubernetes-operator 的简单使用》:https://blog.csdn.net/qq_20042935/article/details/129005302
使用helm
部署flink-kubernetes-operator
到k8s
集群之后,接着项目引用其依赖就可以使用其提供的api
去部署,依赖如下:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-kubernetes-operator -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator</artifactId>
<version>版本号</version>
</dependency>
02 存在的问题
flink-kubernetes-operator源码地址: https://github.com/apache/flink-kubernetes-operator
但是,我们会发现一个问题,就是flink-kubernetes-operator
最低支持jdk11
版本,而我们的项目大部分都是jdk1.8
的,这确实有点坑。解决方案有两种:
- 项目升级到
jdk11
或以上; - 下载源码来修改。
我都没使用以上的两种方法,而是根据实际需求,把关键的代码copy
到本地,精简后的代码结构大致如下(需要代码的可以私聊我):
03 如何使用
到这里,我们比较关心的是如何调用里面的api。
其实简单看过其源码之后,会发现其最终是引用了farbric8开发的kubernetes-client来调用k8s提供的rest-api来对k8s集群的资源(resources
)进行CRUD。
所以,第一步是引入kubernetes-client
的依赖:
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>6.3.1</version>
</dependency>
接着写K8sClient
客户端工具类(当然也可以直接使用其API
,仅仅是为了简化操作):
import cn.hutool.core.io.FileUtil;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Hashtable;
import java.util.Map;
/**
* K8s 客户端
*
* @author : YangLinWei
* @createTime: 2023/2/15 11:17
* @version: 1.0.0
*/
public class K8sClient {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sClient.class);
private static K8sClient k8sClient;
/**
* cache k8s client
* key-> kube config file path
*/
private static Map<String, KubernetesClient> clientMap = new Hashtable<>();
public static K8sClient INSTANCE() {
if (k8sClient == null) {
synchronized (K8sClient.class) {
if (k8sClient == null) {
k8sClient = new K8sClient();
}
}
}
return k8sClient;
}
/**
* get client
*
* @param configFilePath kube config filepath
* @return kubernetes client
*/
public KubernetesClient getClient(String configFilePath) {
return getClient(configFilePath, false);
}
/**
* get client
*
* @param configFilePath kube config filepath
* @param update new client if need updated
* @return kubernetes client
*/
private KubernetesClient getClient(String configFilePath, boolean update) {
if (configFilePath == null || "".equals(configFilePath)) {
return null;
}
if (update && clientMap.containsKey(configFilePath)) {
clientMap.get(configFilePath).close();
clientMap.remove(configFilePath);
}
if (clientMap.containsKey(configFilePath)) {
return clientMap.get(configFilePath);
}
try {
Config config = Config.fromKubeconfig(FileUtil.readUtf8String(configFilePath));
DefaultKubernetesClient client = new DefaultKubernetesClient(config);
clientMap.put(configFilePath, client);
return client;
} catch (Exception e) {
LOGGER.error("Fail to get k8s ApiClient", e);
throw new RuntimeException("Fail to get k8s ApiClient", e);
}
}
}
最后,我们看看单元测试工具类:
/**
* Flink-Kubernetes-Operator Test.
*
* @author : YangLinWei
* @createTime: 2023/2/16 09:27
* @version: 1.0.0
*/
public class FlinkK8sOperatorTest {
public static void main(String[] args) {
// 1. 初始化客户端
KubernetesClient client = K8sClient.INSTANCE().getClient("你的k8s配置文件路径");
// 2. 定义flinkdeployment资源文件
FlinkDeployment flinkDeployment = new FlinkDeployment();
flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
flinkDeployment.setKind("FlinkDeployment");
ObjectMeta objectMeta = new ObjectMeta();
objectMeta.setNamespace("default");
objectMeta.setName("basic");
flinkDeployment.setMetadata(objectMeta);
FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec();
flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_15);
flinkDeploymentSpec.setImage("flink:1.15");
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("taskmanager.numberOfTaskSlots", "1");
flinkDeploymentSpec.setFlinkConfiguration(flinkConfiguration);
flinkDeployment.setSpec(flinkDeploymentSpec);
flinkDeploymentSpec.setServiceAccount("flink");
JobManagerSpec jobManagerSpec = new JobManagerSpec();
jobManagerSpec.setResource(new Resource(1.0, "1024m"));
flinkDeploymentSpec.setJobManager(jobManagerSpec);
TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
taskManagerSpec.setResource(new Resource(1.0, "1024m"));
flinkDeploymentSpec.setTaskManager(taskManagerSpec);
flinkDeployment
.getSpec()
.setJob(
JobSpec.builder()
.jarURI(
"local:///opt/flink/examples/streaming/StateMachineExample.jar")
.parallelism(1)
.upgradeMode(UpgradeMode.STATELESS)
.build());
// 3. 部署flinkdeployment资源文件至k8s集群
client.resource(flinkDeployment).createOrReplace();
// 4. 删除已部署的flinkdeployment
//client.resource(flinkDeployment).delete();
}
}
这样,就可以看到flink
作业已经部署了:
也可以看到起flink web
:
当然,这里面用到了k8s
一些相关的知识,继续讲讲。
04 k8s相关命令
获取集群信息命令:
kubectl cluster-info
导出k8s集群配置命令:
kubectl config view --raw > 本地存储配置文件的路径
获取pod和svc资源:
kubectl get pods -A|grep 你想要查询的pod
kubectl get svc -A|grep 你想要查询的service
暴露服务:
kubectl edit svc 你的service -n 命名空间
然后修改里面的type
为NodePort
即可。
ok,这里不再多说了,有兴趣的童鞋可以参考我的k8s专栏:https://blog.csdn.net/qq_20042935/category_9220572.html
05 文末
本文主要讲解了flink-kubernetes-operator api
使用,希望能帮助到大家,谢谢大家的阅读,本文完!