开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink这个问题怎么解决?

Flink这个问题怎么解决?Flink on yarn,任务代码里想通过jobListener监听任务状态,onJobSubmitted 和onJobExecuted同时触发,回调没生效。

展开
收起
三分钟热度的鱼 2023-11-01 09:25:58 98 0
5 条回答
写回答
取消 提交回答
  • 针对 Flink on YARN 下,任务代码中通过 JobListener 监听任务状态,onJobSubmitted 和 onJobExecuted 同时触发的情况,您可以尝试以下几个解决方案:

    1. 检查 TaskManager 是否已经成功注册到 ResourceManager 中。由于 JobListener 监听的是 TaskManager 上的任务状态,因此只有当 TaskManager 成功注册到 ResourceManager 中时,才能正常接收任务状态变化事件。
    2. 检查任务运行环境是否正确配置。例如,检查 YARN 配置文件是否已正确设置,包括 ResourceManager 地址、RMProxy 类路径等。
    3. 如果您使用的是自定义的 JobListener 实现类,建议检查实现类中的代码逻辑是否存在错误。在执行 onJobSubmitted 方法时,应先判断当前任务是否已经处于 submitted 状态,以免重复触发回调方法。
    4. 检查 YARN 集群的资源是否充足,如果集群资源不足可能导致任务状态无法正常更新。
    5. 在使用 YARN 集群运行 Flink 任务时,建议使用 yarn-cluster 模式。在这种模式下,TaskManager 会直接在 YARN NodeManager 上运行,从而更加接近本地环境。
    2023-11-01 22:02:07
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在 Flink on YARN 中,可以通过 JobListener 监听任务的状态。然而,如果您发现 onJobSubmitted 和 onJobExecuted 方法同时触发,可能是由于一些原因造成的。
    建议您检查以下几个方面:

    • 您是否在同一时间只创建了一个 JobListener 实例?
    • 您是否正确实现了 JobListener 接口,并重写了所有必需的方法?
    • 您是否已经调用了 StreamExecutionEnvironment.addJobListener() 方法,以便将 JobListener 注册到流处理环境中?
    2023-11-01 13:26:30
    赞同 展开评论 打赏
  • 要解决这个问题,您可以尝试以下方法:

    1. 确保您的Flink版本与YARN兼容。您可以查看Flink官方文档以获取有关兼容性的信息。

    2. 在您的任务代码中,确保您已经正确实现了JobListener接口,并重写了onJobSubmittedonJobExecuted方法。例如:

    import org.apache.flink.runtime.jobgraph.JobStatus;
    import org.apache.flink.runtime.jobmaster.JobResult;
    import org.apache.flink.runtime.jobmaster.JobMaster;
    import org.apache.flink.runtime.jobmaster.JobSubmissionContext;
    import org.apache.flink.runtime.jobmaster.listener.JobListener;
    import org.apache.flink.runtime.messages.Acknowledge;
    import org.apache.flink.runtime.messages.Failure;
    import org.apache.flink.runtime.messages.Message;
    import org.apache.flink.runtime.messages.Success;
    import org.apache.flink.runtime.rpc.RpcService;
    import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
    import org.apache.flink.util.Preconditions;
    
    public class MyJobListener implements JobListener {
    
        private final RpcService rpcService;
        private final String jobId;
    
        public MyJobListener(RpcService rpcService, String jobId) {
            this.rpcService = rpcService;
            this.jobId = jobId;
        }
    
        @Override
        public void onJobSubmitted(JobSubmissionContext context) throws Exception {
            // 在这里处理任务提交事件
        }
    
        @Override
        public void onJobExecutionResult(JobExecutionResult result) throws Exception {
            // 在这里处理任务执行结果事件
        }
    
        @Override
        public void onJobFailed(Throwable cause) throws Exception {
            // 在这里处理任务失败事件
        }
    
        @Override
        public void onJobCancelled() throws Exception {
            // 在这里处理任务取消事件
        }
    
        @Override
        public void onJobFinished(JobStatus status) throws Exception {
            // 在这里处理任务完成事件
        }
    }
    
    1. 在您的Flink程序中,将MyJobListener实例注册到JobManager
    import org.apache.flink.api.common.JobVertexID;
    import org.apache.flink.runtime.executiongraph.failover.flip1.graph.DefaultFailoverTopology;
    import org.apache.flink.runtime.jobgraph.JobGraph;
    import org.apache.flink.runtime.jobgraph.JobStatus;
    import org.apache.flink.runtime.jobgraph.JobVertex;
    import org.apache.flink.runtime.jobmanager.scheduler.SchedulerGateway;
    import org.apache.flink.runtime.rpc.RpcService;
    import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
    import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
    
    public class FlinkOnYarnExample {
    
        public static void main(String[] args) throws Exception {
            RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 8081);
            SchedulerGateway schedulerGateway = rpcService.getSchedulerGateway();
            TaskExecutorGateway taskExecutorGateway = rpcService.getTaskManagerGateway(new TaskManagerLocation("localhost", 8082));
    
            // 创建一个简单的任务图
            JobGraph jobGraph = new JobGraph("My Flink Job");
            JobVertex jobVertex = new JobVertex("My Job Vertex");
            jobGraph.addVertex(jobVertex);
    
            // 注册监听器
            MyJobListener myJobListener = new MyJobListener(rpcService, jobGraph.getJobID());
            schedulerGateway.registerJobListener(myJobListener);
    
            // 提交任务图
            schedulerGateway.submitJob(jobGraph);
        }
    }
    

    这样,当任务状态发生变化时,您的MyJobListener中的相应方法将被调用。

    2023-11-01 11:56:58
    赞同 展开评论 打赏
  • 抱歉哈,云上没有on yarn的环境,您应该是开源自建,请参考群公告进社区群。此回答整理自钉群“实时计算Flink产品交流群”

    2023-11-01 10:06:18
    赞同 展开评论 打赏
  • 如果你想在 Flink on YARN 中监听任务状态,建议你可以采用以下方法:

    1. 使用 RichFunction 实现 JobExecutionResult 接口。这样,你就可以监听 Task Execution 结果。
    2. 使用 JobExecutionListener 接口来监听任务的状态。这个接口可以让你在任务开始、结束时获得通知,并捕获异常。
    3. 设置 OnTaskError 方法来处理任务执行期间发生的异常。

    你可以尝试使用其中一个或两个方法来实现任务状态监听。

    2023-11-01 09:37:06
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载