问题一:flink本地启动webUI,TaskManager的Logs日志可以正常查看怎么办?
有大佬遇到过,flink本地启动webUI,TaskManager的Logs日志可以正常查看,但是点击Stdout查看结果输出就报错? ERROR (org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler:lambda$respondToRequest$1) - Failed to transfer file from TaskExecutor b5476663-a612-4946-a5ad-4a4b5e188981.
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:647)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:260)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:2210)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
参考答案:
引入日志配置(logback、log4j都可以),开启日志文件的appender和logger,然后将日志文件路径引入到初始化env的conf中就可以了。
1、引入日志配置,包括pom文件中的依赖和 src/main/resources 目录下的日志文职文件。下面以log4j2.xml为例展示日志配置:
<?xml version="1.0" encoding="UTF-8"?> <configuration monitorInterval="5"> <Properties> <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /> <property name="LOG_LEVEL" value="INFO" /> </Properties> <appenders> <console name="console" target="SYSTEM_OUT"> <PatternLayout pattern="${LOG_PATTERN}"/> <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/> </console> <File name="log" fileName="tmp/log/job.log" append="false"> <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/> </File> </appenders> <loggers> <root level="${LOG_LEVEL}"> <appender-ref ref="console"/> <appender-ref ref="log"/> </root> </loggers> </configuration>
2、在flink程序开始初始化env时增加log_path相关的配置项,如下所示:
Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT,"8081"); conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log"); conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log"); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
然后重新启动程序就可以WebUI正常查看日志了,如图:
——参考链接。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/639660
问题二:flink本地可以执行,在服务器上不能执行
springboot项目里集成flink,idea本地启动项目,flink任务可以执行,打包放服务器上跑执行就报错了,flink1.19 cdc3.0.1
<groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.ververica/flink-cdc-runtime --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-cdc-runtime</artifactId> <version>${flink.cdc.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.12</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mongodb-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-sqlserver-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-db2-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency>
[flink-pekko.actor.default-dispatcher-9] o.a.f.r.d.StandaloneDispatcher [Dispatcher.java : 1343] Job a9f375a2c1d30edda63a04b77c979cdc reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
参考答案:
遇到在本地IDEA中运行Flink任务正常,但在服务器上运行时报错的情况,通常与以下几个因素相关:
- 环境差异:确保服务器上的Java环境与本地一致,包括Java版本和环境变量设置。Flink对Java版本有特定要求,不同版本的Flink可能需要特定版本的Java运行时环境。
- 依赖冲突:从错误信息来看,
ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
表明可能存在类路径问题或依赖冲突。检查服务器上的应用程序包是否完整,确保所有依赖都已正确打包进去,没有遗漏。同时,确认没有其他版本的Flink库或与其冲突的库存在于服务器的类路径中。有时候,服务器上可能有全局的Maven仓库或旧版库,这些可能会干扰你的应用。 - Flink配置:检查服务器上的Flink配置文件(如
flink-conf.yaml
),确保配置正确且与本地开发环境相匹配,特别是关于网络、内存、并行度等方面的设置。 - 打包方式:确认你的项目在打包时是否正确地包含了所有必要的依赖。使用Maven或Gradle打包时,确保使用了正确的命令,如Maven的
mvn clean package -DskipTests
,并且检查是否使用了正确的打包类型(如jar
或war
)。 - Classpath问题:如果项目中有自定义类或依赖,确保它们在服务器上运行时也能正确加载。可能需要检查启动脚本或命令,确认CLASSPATH环境变量是否正确设置了。
- 权限问题:虽然错误信息未直接指出,但有时服务器上的权限问题也会导致任务无法正常启动,确保Flink运行用户有足够的权限访问相关目录和资源。
解决思路:
- 验证依赖:首先,通过查看项目的依赖树(使用Maven的
mvn dependency:tree
或Gradle的gradle dependencies
命令)确认所有依赖正确无冲突。 - 检查打包内容:解压服务器上的应用包,确认所有依赖库都已包含其中。
- 日志分析:查看Flink在服务器上的详细日志,可能会有更多的错误信息提示问题所在。
- 环境一致性:确保服务器上的Java版本、Flink配置等与本地开发环境保持一致。
- 启动脚本:如果使用自定义脚本启动应用,检查脚本中是否正确指定了Java路径、Flink配置路径以及是否正确设置了CLASSPATH。
如果以上步骤都无法解决问题,考虑将服务器上的Flink运行环境完全复刻到本地进行调试,或者进一步提供详细的错误日志进行分析。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/624247
问题三:Flink报错太长了,这个两个是最上面的和最下面的,各位碰到过这个问题么?
Flink报错太长了,这个两个是最上面的和最下面的,各位大佬碰到过这个问题么?
flink sql 1.16 , sql 比较长里面有一堆的case when 常量,这个问题可以通过参数解决嘛?
参考答案:
我直接封装的UDF
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/653349
问题四:Flink1.19编译报错是什么问题?
Flink1.19编译报错是什么问题?[ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on project flink-table-planner_2.12: Error creating shaded jar: Problem shading JAR D:\workspace\github\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.19-SNAPSHOT.jar entry org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class -> [Help 1]
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on project flink-table-planner_2.12: Error creating shaded jar: Problem shading JAR D:\workspace\github\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.19-SNAPSHOT.jar entry org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class -> [Help 1]
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on project flink-table-planner_2.12: Error creating shaded jar: Problem shading JAR D:\workspace\github\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.19-SNAPSHOT.jar entry org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class -> [Help 1]
参考答案:
把jdk1.8升级到最近版本
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/623578
问题五:Flink1.15.4 flinksql CDC设置TTL问题?
Flink1.15.4 中利用flinksql CDC同步mysql数据到kafka中,一旦sql中使用SET 'table.exec.state.ttl' = '48h' 后,手动停止任务,再通过checkpoint/savepoint恢复,还是从头在同步数据到upsert-kafka中,请问是什么原因?难道设置TTL后,就恢复job后就无法增量同步数据吗?状态后端为rocksdb,flinksql中参数如下:
SET 'execution.checkpointing.interval' = '10s';
-- SET 'table.exec.state.ttl' = '48h';
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '10s';
SET 'table.exec.mini-batch.size' = '500';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
SET 'execution.checkpointing.max-concurrent-checkpoints'='1';
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
参考答案:
在Flink SQL中使用table.exec.state.ttl
配置项来设置状态的生命周期,这个设置会影响作业中维护的各种状态的存活时间,包括但不限于source reader的状态。当一个状态超过了其TTL(Time To Live)之后,Flink将会清除这些状态。这意味着如果一个作业因为某些原因(例如手动停止)重启后,如果之前的状态已经因为超时被清理,那么作业可能无法从上次离开的地方继续执行,因为它依赖的一些状态信息可能已经丢失。
具体到您的场景,设置了table.exec.state.ttl
为48小时,并且在停止任务后尝试通过checkpoint或savepoint恢复作业。理论上,使用checkpoint或savepoint应该能够恢复作业的执行状态并继续增量同步,因为checkpoint/savepoint包含了作业执行的所有必要状态,它们独立于table.exec.state.ttl
之外。
但是,如果在使用checkpoint/savepoint恢复作业时仍然看到从头开始同步数据,这可能有几个潜在的原因:
- Checkpoint/Savepoint 不完整或损坏:确认保存的checkpoint或savepoint是否包含了所有必要的状态信息,有时候这些点可能会因为各种原因(如网络问题、磁盘空间不足等)未能正确完成。
- 配置冲突或误解:虽然一般情况下
table.exec.state.ttl
不应该直接影响从checkpoint/savepoint恢复的能力,但可能存在某种配置上的误解或不兼容性,导致系统错误地处理了状态恢复流程。 - RocksDB 状态后端的特殊行为:使用RocksDB作为状态后端时,如果配置不当或存在已知bug,可能会影响到状态的恢复逻辑,尤其是当涉及到TTL管理和状态清理时。
- Flink版本特定问题:特定于Flink 1.15.4的bug或未文档化的特性可能导致了这一行为,建议查阅Flink的官方issue跟踪系统看是否有相关的问题报告。
解决此问题的建议步骤:
- 检查和验证Checkpoint/Savepoint:确保用于恢复的checkpoint或savepoint是完整且有效的。
- 查阅官方文档和发行说明:查看Flink 1.15.4的文档和发行说明,确认是否有与
table.exec.state.ttl
和状态恢复相关的已知问题或注意事项。 - 测试不同的配置组合:尝试在不设置
table.exec.state.ttl
的情况下恢复作业,看是否能够正常增量同步,以此来进一步定位问题。 - 社区求助:在Flink的用户邮件列表或社区论坛中描述问题,寻求帮助,可能有其他用户遇到过类似的情况并找到了解决方案。
- 升级Flink版本:如果上述方法都无法解决问题,考虑升级到Flink的新版本,看是否修复了潜在的bug。
关于本问题的更多回答可点击进行查看: