开发者社区 问答 正文

支持flink.yarn.jars 参数吗?

在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。

YarnClusterDescriptor.java

// upload and register ship files String systemJarHdfsDir = configuration.getString("stream.flink.system.jars.dir", ""); List systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths, localResources, envShipFileList);

String userJars = configuration.getString("stream.flink.use.jars", ""); List userClassPaths; if (userJars != null && !"".equals(userJars)) { userClassPaths = registerUserJars(fs, userJars.split(","), paths, localResources, envShipFileList); } else { userClassPaths = Collections.emptyList(); }

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); }

// normalize classpath by sorting Collections.sort(systemClassPaths); Collections.sort(userClassPaths);

// classpath assembler StringBuilder classPathBuilder = new StringBuilder(); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } }

// Setup jar for ApplicationMaster Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath, localResources);*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 17:52:35 615 分享 版权
1 条回答
写回答
取消 提交回答
  • 有这个想法,目前腾讯内部已经实现了相关功能,我记得 Yang Wang(in cc) 在阿里也做了类似的功能,这个要做干净可能需要连着跟

    YarnClusterDescriptor 的代码都整理一下。确实也看到这个需求常常被提起,尽量在 1.11 里面实现吧。

    你也可以再详细描述下行为或者由你实现社区这边帮忙 review 呀,我不太记得有没有 JIRA 了,你可以找找或者直接建一个。*来自志愿者整理的FLINK邮件归档

    2021-12-02 18:18:17
    赞同 展开评论