有大佬遇到过,flink本地启动webUI,TaskManager的Logs日志可以正常查看,但是点击Stdout查看结果输出就报错? ERROR (org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler:lambda$respondToRequest$1) - Failed to transfer file from TaskExecutor b5476663-a612-4946-a5ad-4a4b5e188981.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:647)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:2210)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
这个错误信息表明 Flink Web UI 尝试从 TaskExecutor 请求标准输出(Stdout)文件,但是该文件在 TaskExecutor 上不存在。这可能是由于多种原因造成的。以下是一些可能的解决方案和检查步骤:
taskmanager.stdout.path: /path/to/stdout
taskmanager.log.path: /path/to/log
jobmanager.stdout.path: /path/to/stdout
jobmanager.log.path: /path/to/log
确保这些路径指向正确的目录,并且 TaskManager 有权限写入这些目录。
在 Apache Flink 的本地启动环境中,报错消息表明在尝试通过 Web UI 查看 TaskManager 的 Stdout 时出现了问题,错误信息是 The file STDOUT does not exist on the TaskExecutor。这种情况通常是因为 Flink 没有找到 TaskManager 的标准输出文件。以下是一些可能的原因和解决方法:
可能的原因
标准输出重定向:默认情况下,TaskManager 的标准输出可能没有被重定向到文件。
文件路径配置错误:Flink 配置文件中的日志路径配置可能有误。
权限问题:Flink 进程可能没有权限写入标准输出文件。
日志配置问题:Flink 的日志配置文件(例如 log4j 配置)可能没有正确配置标准输出的记录。
解决方法
taskmanager.log.file: /path/to/your/log/taskmanager.log
log4j.properties 示例
log4j.rootLogger=INFO, file, console
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=${flink.log.dir}/taskmanager.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{1} - %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{1} - %m%n
log4j2.xml 示例
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} [%t] %-5p %c{1} - %m%n"/>
</Console>
<File name="File" fileName="${sys:flink.log.dir}/taskmanager.log">
<PatternLayout pattern="%d{ISO8601} [%t] %-5p %c{1} - %m%n"/>
</File>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="File"/>
</Root>
</Loggers>
</Configuration>
确认 TaskManager 启动参数
确保 TaskManager 启动时没有重定向标准输出。例如,在启动 TaskManager 时,不要使用 &> /dev/null 或类似的重定向命令。
检查权限
确保 Flink 进程有权限写入标准输出文件所在的目录。
chmod -R 755 /path/to/your/log/
ls /path/to/your/log/
总结
通过以上步骤,确保 Flink 的日志配置正确,并且有权限写入标准输出文件,然后重新启动 Flink 集群。这样应该可以解决在 Web UI 中无法查看 TaskManager 标准输出的问题。如果问题依然存在,可以查看 Flink 的详细日志以获取更多的错误信息,从而进一步排查问题。
遇到您描述的问题,即在Flink本地启动WebUI后,能够查看TaskManager的Logs日志,但点击Stdout查看输出时出现错误,这可能是由于以下几个原因造成的:
日志文件不存在:错误信息指出"The file STDOUT does not exist on the TaskExecutor",表明TaskExecutor上没有STDOUT日志文件。这可能是因为TaskManager尚未完全启动完成,或者在配置中没有正确设置以保存STDOUT日志到预期位置。
日志配置问题:确保Flink配置中正确设置了日志输出到文件系统、OSS或SLS等位置,特别是确保STDOUT和STDERR的日志捕获是启用的。默认情况下,Flink配置会保留一定数量和大小的日志文件,但不修改此配置并不直接导致此问题,关键是确认日志输出路径及目标系统(如本地磁盘、OSS)是否配置正确且可访问。
TaskExecutor状态:根据注意事项,如果Task Manager和Job Manager没有正常启动,日志可能无法正确写入SLS或OSS。因此,确认这两个组件的状态是关键。如果它们中的任何一个没有成功启动,可能会导致日志文件缺失。
权限与路径问题:检查TaskManager运行时的用户是否有权限访问日志存储目录,以及目录路径是否配置正确无误。
在尝试通过 Flink Web UI 查看 TaskManager 的 STDOUT 日志时遇到的问题是因为 STDOUT 文件不存在于 TaskExecutor 上。
貌似是由于STDOUT文件在TaskExecutor上不存在导致的。具体错误信息表明了一个FlinkException,指出文件“STDOUT”在TaskExecutor上未找到
你去检查TaskManager配置,特别是日志相关设置,确认 Stdout 日志被正确地定向和保留。Flink允许通过配置文件(如 flink-conf.yaml)来调整日志行为,包括日志目录和级别等。
引入日志配置(logback、log4j都可以),开启日志文件的appender和logger,然后将日志文件路径引入到初始化env的conf中就可以了。
1、引入日志配置,包括pom文件中的依赖和 src/main/resources 目录下的日志文职文件。下面以log4j2.xml为例展示日志配置:
<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
<Properties>
<property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
<property name="LOG_LEVEL" value="INFO" />
</Properties>
<appenders>
<console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="${LOG_PATTERN}"/>
<ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
</console>
<File name="log" fileName="tmp/log/job.log" append="false">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
</File>
</appenders>
<loggers>
<root level="${LOG_LEVEL}">
<appender-ref ref="console"/>
<appender-ref ref="log"/>
</root>
</loggers>
</configuration>
2、在flink程序开始初始化env时增加log_path相关的配置项,如下所示:
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
然后重新启动程序就可以WebUI正常查看日志了,如图:
——参考链接。
遇到这个问题,通常意味着Flink TaskManager侧没有生成或找不到期望的标准输出(STDOUT)日志文件。这可能是由几个原因造成的:
日志配置问题:Flink允许用户自定义日志框架和配置。请检查你的flink-conf.yaml
配置文件,确保日志配置正确无误,特别是与日志输出路径和类型相关的设置。默认情况下,Flink应该配置为输出到文件系统,并且这些日志应当包含stdout和stderr。
TaskManager的日志目录权限:确保TaskManager运行时使用的用户具有写入日志目录的权限。如果权限不足,日志文件可能无法被创建。
Flink运行模式:在某些特定的运行模式下(比如某些轻量级的本地测试配置),日志收集机制可能与生产环境有所不同,导致stdout/stderr日志处理异常。确认你的本地环境配置是否完全符合预期。
日志回滚策略:检查日志回滚策略是否可能导致旧的日志文件被删除。例如,如果日志rotate频繁且策略设置过于严格,可能在你尝试查看时文件已被移除。
Flink版本问题:确认你使用的Flink版本没有已知的日志处理问题。查阅相关版本的发行说明或已知问题列表,看是否有相似的报告或已修复的bug。
查看TaskManager日志:直接查看TaskManager的日志输出(通常是taskmanager.log
),可能会有更多关于为什么STDOUT文件不存在的线索。
环境变量或系统配置:检查是否有环境变量或系统配置影响了日志的输出行为,例如Java的系统属性或环境变量LOG_DIR
等。
为了解决这个问题,你可以尝试以下步骤:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。