Flink这个问题怎么解决?Flink on yarn,任务代码里想通过jobListener监听任务状态,onJobSubmitted 和onJobExecuted同时触发,回调没生效。
针对 Flink on YARN 下,任务代码中通过 JobListener 监听任务状态,onJobSubmitted 和 onJobExecuted 同时触发的情况,您可以尝试以下几个解决方案:
在 Flink on YARN 中,可以通过 JobListener 监听任务的状态。然而,如果您发现 onJobSubmitted 和 onJobExecuted 方法同时触发,可能是由于一些原因造成的。
建议您检查以下几个方面:
要解决这个问题,您可以尝试以下方法:
确保您的Flink版本与YARN兼容。您可以查看Flink官方文档以获取有关兼容性的信息。
在您的任务代码中,确保您已经正确实现了JobListener
接口,并重写了onJobSubmitted
和onJobExecuted
方法。例如:
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobSubmissionContext;
import org.apache.flink.runtime.jobmaster.listener.JobListener;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.Failure;
import org.apache.flink.runtime.messages.Message;
import org.apache.flink.runtime.messages.Success;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.Preconditions;
public class MyJobListener implements JobListener {
private final RpcService rpcService;
private final String jobId;
public MyJobListener(RpcService rpcService, String jobId) {
this.rpcService = rpcService;
this.jobId = jobId;
}
@Override
public void onJobSubmitted(JobSubmissionContext context) throws Exception {
// 在这里处理任务提交事件
}
@Override
public void onJobExecutionResult(JobExecutionResult result) throws Exception {
// 在这里处理任务执行结果事件
}
@Override
public void onJobFailed(Throwable cause) throws Exception {
// 在这里处理任务失败事件
}
@Override
public void onJobCancelled() throws Exception {
// 在这里处理任务取消事件
}
@Override
public void onJobFinished(JobStatus status) throws Exception {
// 在这里处理任务完成事件
}
}
MyJobListener
实例注册到JobManager
:import org.apache.flink.api.common.JobVertexID;
import org.apache.flink.runtime.executiongraph.failover.flip1.graph.DefaultFailoverTopology;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
public class FlinkOnYarnExample {
public static void main(String[] args) throws Exception {
RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 8081);
SchedulerGateway schedulerGateway = rpcService.getSchedulerGateway();
TaskExecutorGateway taskExecutorGateway = rpcService.getTaskManagerGateway(new TaskManagerLocation("localhost", 8082));
// 创建一个简单的任务图
JobGraph jobGraph = new JobGraph("My Flink Job");
JobVertex jobVertex = new JobVertex("My Job Vertex");
jobGraph.addVertex(jobVertex);
// 注册监听器
MyJobListener myJobListener = new MyJobListener(rpcService, jobGraph.getJobID());
schedulerGateway.registerJobListener(myJobListener);
// 提交任务图
schedulerGateway.submitJob(jobGraph);
}
}
这样,当任务状态发生变化时,您的MyJobListener
中的相应方法将被调用。
抱歉哈,云上没有on yarn的环境,您应该是开源自建,请参考群公告进社区群。此回答整理自钉群“实时计算Flink产品交流群”
如果你想在 Flink on YARN 中监听任务状态,建议你可以采用以下方法:
你可以尝试使用其中一个或两个方法来实现任务状态监听。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。