开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问一下各位大佬,java 写的flink程序提交到yarn集群,application name都

请问一下各位大佬,java 写的flink程序提交到yarn集群,application name都是flink session cluster,多个程序运行的话,难以分辨,如何在代码里指定application的名字呢?多谢多谢啦 本问题来自阿里云开发者社区的【11大垂直技术领域开发者社群】。 点击这里欢迎加入感兴趣的技术领域群。

展开
收起
黄一刀 2019-12-05 13:35:46 3492 0
3 条回答
写回答
取消 提交回答
  • flink1.13.3源码:

    flink-yarn项目下的YarnClusterDescriptor类

    session pre-job yarn-application yarn的名称写死了。除非自己改源码编译jar包。 -ynm参数是无效的。我也烦死了。启动很多yarn,完全不知道那个是那个。

    为啥不用application-type来区分 三种启动模式。太无语了。

    package org.apache.flink.yarn;

    @Override

    public ClusterClientProvider deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {

        checkNotNull(clusterSpecification);
        checkNotNull(applicationConfiguration);
    
        final YarnDeploymentTarget deploymentTarget =
                YarnDeploymentTarget.fromConfig(flinkConfiguration);
        if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
            throw new ClusterDeploymentException(
                    "Couldn't deploy Yarn Application Cluster."
                            + " Expected deployment.target="
                            + YarnDeploymentTarget.APPLICATION.getName()
                            + " but actual one was \""
                            + deploymentTarget.getName()
                            + "\"");
        }
    
        applicationConfiguration.applyToConfiguration(flinkConfiguration);
    
        final List<String> pipelineJars =
                flinkConfiguration
                        .getOptional(PipelineOptions.JARS)
                        .orElse(Collections.emptyList());
        Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
    
        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink Application Cluster",
                    YarnApplicationClusterEntryPoint.class.getName(),
                    null,
                    false);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
        }
    }
    
    @Override
    public ClusterClientProvider<ApplicationId> deploySessionCluster(
            ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink session cluster",
                    getYarnSessionClusterEntrypoint(),
                    null,
                    false);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
        }
    }
    
    @Override
    public ClusterClientProvider<ApplicationId> deployJobCluster(
            ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
            throws ClusterDeploymentException {
        try {
            return deployInternal(
                    clusterSpecification,
                    "Flink per-job cluster",
                    getYarnJobClusterEntrypoint(),
                    jobGraph,
                    detached);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
        }
    }
    
    2021-11-04 18:58:24
    赞同 展开评论 打赏
  • 可以试试在提交的时候加上名称: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.execute("tablejoindemo");

    2020-03-06 20:56:06
    赞同 展开评论 打赏
  • yarn任务可以设置名字啊

    2019-12-05 13:36:20
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载