开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

各位老师Flink中我在创建yarn客户端yarn.init和start方法都成功了,然后运行fli

各位老师Flink中我在创建yarn客户端yarn.init和start方法都成功了,然后运行flink任务也成功了,集群中没有生成新的appid是怎么回事,使用的是yarn perjob模式?private void internalSubmitJob(int step, String jobSteps, Long jobId) {

    Configuration flinkConfiguration = FlinkCommonConfiguration.getCommonFlinkConf();
    String configurationDirectory = FlinkCommonConfiguration.getConfigurationDirectory();

    //创建yarn客户点并初始化、启动
    org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
    //指定yarn-site.xml路径
    configuration.addResource(new Path("/xxxxxx/yarn-site.xml"));
    configuration.addResource(new Path("/xxxxxx/core-site.xml"));
    configuration.addResource(new Path("/xxxxxx/resource-types.xml"));
    configuration.addResource(new Path("/xxxxxx/mapred-site.xml"));

// String keytabPath = getClass().getClassLoader().getResource("appuser.keytab").getPath();
// String principal = "appuser"
//
// UserGroupInformation.loginUserFromKeytab("");

    YarnClient yarnClient = YarnClient.createYarnClient();
    YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);

    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    // 创建YarnClusterDescriptor
    YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);

    // 集群模式设置为per-job模式
    flinkConfiguration.set(
            DeploymentOptions.TARGET,
            YarnDeploymentTarget.PER_JOB.getName());


    // yarn application name 设置为flink-application
    flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

    YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

    ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
            .createClusterSpecification();

    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true);

    StreamGraph streamGraph;
    JobGraph jobGraph;

    // 替换为你的Flink作业jar文件路径
    try {
        streamGraph = FlinkJobRunner.execute(jobId);
        jobGraph = streamGraph.getJobGraph();
        //per job
        ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification,jobGraph, true);
        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        ApplicationId applicationId = clusterClient.getClusterId();
        String webInterfaceURL = clusterClient.getWebInterfaceURL();
        jobLifeCycleManager.launchNewJobLifeCycle(jobHandler.getJobDescriptor(), jobSteps);
        jobLifeCycleManager.setApplicationId(applicationId.toString());
        jobLifeCycleManager.setClusterClient(clusterClient);
    } catch (Exception e){

// jobLifeCycleManager.raiseException();
log.warn("sumbit异常");
throw new RuntimeException(e);
}

}这个是我具体实现方法

展开
收起
真的很搞笑 2023-07-18 21:43:33 77 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    Flink 版本和 YARN 版本是否兼容:在使用 Flink 运行在 YARN 集群上时,需要确保 Flink 版本和 YARN 版本兼容。您可以查看 Flink 官方文档中的版本兼容性矩阵,以确定 Flink 版本和 YARN 版本之间的兼容性。

    YARN 配置是否正确:在使用 Flink 运行在 YARN 集群上时,需要正确配置 YARN 的相关参数,例如 YARN 的资源管理器地址、队列名称、内存和 CPU 资源等。您可以检查 YARN 的配置文件,以确保相关参数正确配置。

    Flink 配置是否正确:在使用 Flink 运行在 YARN 集群上时,还需要正确配置 Flink 的相关参数,例如 Flink 的运行模式、作业部署方式、作业参数等。您可以检查 Flink 的配置文件,以确保相关参数正确配置。

    日志是否有异常信息:在 Flink 运行在 YARN 集群上时,还需要检查 Flink 和 YARN 的日志,以了解运行时的异常信息。您可以查看 Flink 和 YARN 的日志文件,以检查是否存在异常信息和错误提示

    2023-07-29 21:22:43
    赞同 展开评论 打赏
  • 在使用Flink的yarn-perjob模式时,如果在创建yarn客户端、初始化和启动方法都成功了,但集群中没有生成新的Application ID,可能有以下几个可能的原因:

    1. 提交任务的过程出现错误:尽管您在代码中执行了yarn.init和start方法成功,但可能在提交任务的过程中遇到了错误,导致未能生成新的Application ID。您可以检查提交任务的代码逻辑,确保没有遗漏或错误。

    2. 配置文件问题:请确保您在提交任务之前正确配置了相关的Flink和YARN参数。检查Flink的配置文件(如flink-conf.yaml)和YARN的配置文件(如yarn-site.xml),确保有正确设置相关的属性,如YARN管理器地址、资源分配等。

    3. YARN集群资源不足:如果YARN集群没有足够的资源来支持新的应用程序启动,可能会导致无法生成新的Application ID。您可以检查YARN集群的资源使用情况,确保有足够的容量来运行您的Flink任务。

    4. 网络连接问题:可能存在网络连接不稳定或延迟高的情况,导致与YARN集群通信失败。请确保您的网络连接正常,并且能够正常访问YARN集群。

    如果仍然无法解决问题,请提供更多详细的错误日志或代码片段,以便我们能够更好地理解和帮助您解决该问题。同时,您也可以在Flink社区的论坛或邮件列表中提问,以获取更多有关此问题的专业支持。

    2023-07-29 19:07:18
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载