org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 6ccea31b5fa26415bd039f39859f2) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at com.ebupt.QueryMetricsJob.job(QueryMetricsJob.java:71) at com.ebupt.QueryMetricsJob.main(QueryMetricsJob.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (6ccea31b5fa26415bd039f39859f2)] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
1.flink-conf.yaml配置
//指示是否从 Kerberos ticket 缓存中读取
security.kerberos.login.use-ticket-cache: false1
//Kerberos 密钥表文件的绝对路径
security.kerberos.login.keytab: /data/home/keytab/flink.keytab
//认证主体名称
security.kerberos.login.principal: flink@data.com
//Kerberos登陆contexts
security.kerberos.login.contexts: Client,KafkaClient
2.代码说明
val properties: Properties =new Properties()
properties.setProperty("bootstrap.servers","broker:9092")
properties.setProperty("group.id","testKafka")
properties.setProperty("security.protocol","SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism","GSSAPI")
properties.setProperty("sasl.kerberos.service.name","kafka")
consumer =new FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)
参数说明:security.protocol
运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。
来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。