问题一:flink sql 开启mini-batch会报错
flink版本1.15
在代码中设了如下参数来开启minibatch模式:
tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
tableEnv.getConfig().set("table.exec.mini-batch.allow-latency", "5s");
tableEnv.getConfig().set("table.exec.mini-batch.size", "100");
程序的逻辑如下:
" select " +
" max(min_collect_time) as min_collect_time, " +
" min(max_collect_time) as min_collect_time " +
" from (select vin," +
" report_no," +
" max(collect_time) as max_collect_time," +
" min(collect_time) as min_collect_time," +
" count(*) as error_count" +
" from source_kafka_table_rts_diffence_msg" +
" group by vin,report_no" +
" ) t002" +
在本地IDEA环境中运行没问题,但在集群中运行就会报如下错误:
2024-05-11 11:21:08,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - GlobalGroupAggregate[46] -> Calc[47] -> LocalGroupAggregate[48] (3/3) (4747bbadf2bc0c6196fcdf3105443d9b) switched from INITIALIZING to FAILED on container_e10_1697183766965_0747_01_000002 @ ecs-vat-bigdata-test-new-cdh4 (dataPort=34873).
java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$1677'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction.open(MiniBatchLocalGroupAggFunction.java:59) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator.open(AbstractMapBundleOperator.java:82) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.0.jar:1.15.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 16, Column 28: Cannot determine simple type name "org"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8271) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6873) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$14400(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6499) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6494) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4310) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6855) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$14200(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6497) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6494) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4224) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6494) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6490) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9026) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5062) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
请大家帮忙看看这是什么问题呢?
参考答案:
这个错误信息表明在Flink运行时遇到了一个编译问题,具体是关于Table API的。异常堆栈中提到的"Cannot determine simple type name "org""意味着编译器无法识别或解析名为"org"的类型。这可能是由于以下原因:
导入问题:检查你的代码中是否有正确的导入语句,特别是对于"org"开头的包(例如,import org.apache.flink...)。确保你导入了所有必要的库,并且这些库在类路径中可用。
类型引用错误:可能你在代码中使用了一个未完全限定的类型名称,例如只写了"org"而没有指定具体的类或接口。你应该总是使用全限定类名(如org.apache.flink.TableEnvironment)。
Flink版本兼容性:确认你的Flink版本(1.15.0)与你使用的其他依赖(如Table API、Calcite等)兼容。不兼容的版本可能会导致编译错误。
编译环境问题:这可能是由于编译环境设置不正确,例如,如果你在本地运行Flink,确保你的IDE和构建工具配置正确。
为了进一步诊断问题,你需要查看导致这个错误的具体代码行,特别是与"org"相关的部分。如果可以提供这部分代码或者更详细的上下文,可能更容易找到解决方案。同时,尝试更新到最新版本的Flink和相关依赖,因为这可能是已知问题并已在新版本中修复。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/622375
问题二:Flink On Docker 启动 jm 时报这个错误请问有知道这个错误的原因吗?
请问有大佬知道这个错误的原因吗?Flink On Docker 启动 jm 时报这个错误?
参考答案:
报错是因为用的是“createLocalEnvironment()”
解决办法:使用ExecutionEnvironment.getExecutionEnvironment()
//StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
——参考链接。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/623572
问题三:Flink Job任务设置table.exec.state.ttl = '24h'后,是正常的吗?
Flink Job任务设置table.exec.state.ttl = '24h'后,从最新一个ck恢复任务后观察到 还是全量读取数据,请问是正常的吗?指定ck恢复任务感觉没生效呢?
参考答案:
设置了参数 table.exec.state.ttl = '24h',这意呀着作业状态的过期时间为24小时,该设置并不会直接影响从检查点(checkpoint)恢复任务时的数据读取行为。
当Flink作业从最新的检查点恢复时,它会利用检查点中保存的进度信息来继续执行,而不是根据 table.exec.state.ttl 的设置来决定数据读取的起点。这意味着,如果检查点包含了全量的数据状态,那么恢复时自然会从检查点记录的位置开始处理,可能会出现看似“全量读取数据”的情况,但这实际上是恢复流程的一部分,以保证作业的精确一次(exactly-once)语义。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/623564
问题四:Flink hbase短路读取datanode报错怎么办?
Flink hbase短路读取datanode报错?java.net.SocketTimeoutException: read(2) error: Resource temporarily unavailable
2024-05-20 08:22:25,735 WARN [RpcServer.default.RWQ.Fifo.read.handler=144,queue=12,port=16020] hdfs.BlockReaderFactory: BlockReaderFactory(fileName=/hbase/data/default/alerts/6aedb1697e29f1e4be88996849fbb716/data/e20886e05d7b4fbcac8e209800a76709, block=BP-1719712434-10.80.10.150-1522218330932:blk_1589343399_515603961): I/O error requesting file descriptors. Disabling domain socket DomainSocket(fd=753,path=/var/lib/hadoop-hdfs/dn_socket)
java.net.SocketTimeoutException: read(2) error: Resource temporarily unavailable
at org.apache.hadoop.net.unix.DomainSocket.readArray0(Native Method)
at org.apache.hadoop.net.unix.DomainSocket.access$000(DomainSocket.java:45)
at org.apache.hadoop.net.unix.DomainSocket$DomainInputStream.read(DomainSocket.java:532)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2292)
at org.apache.hadoop.hdfs.BlockReaderFactory.requestFileDescriptors(BlockReaderFactory.java:542)
at org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:490)
at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:782)
at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:716)
at org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
at org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1161)
at org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1086)
at org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1439)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1402)
at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:89)
at org.apache.hadoop.hbase.io.hfile.HFileBlock.positionalReadWithExtra(HFileBlock.java:805)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readAtOffset(HFileBlock.java:1565)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockDataInternal(HFileBlock.java:1769)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockData(HFileBlock.java:1594)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.readBlock(HFileReaderImpl.java:1488)
at org.apache.hadoop.hbase.io.hfile.HFileBlockIndex$CellBasedKeyBlockIndexReader.loadDataBlockWithScanInfo(HFileBlockIndex.java:340)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:852)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:802)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seekAtOrAfter(StoreFileScanner.java:326)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:227)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.enforceSeek(StoreFileScanner.java:470)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.pollRealKV(KeyValueHeap.java:369)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.(KeyValueHeap.java:103)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.(KeyValueHeap.java:81)
at org.apache.hadoop.hbase.regionserver.StoreScanner.resetKVHeap(StoreScanner.java:407)
at org.apache.hadoop.hbase.regionserver.StoreScanner.(StoreScanner.java:257)
at org.apache.hadoop.hbase.regionserver.MobStoreScanner.(MobStoreScanner.java:44)
at org.apache.hadoop.hbase.regionserver.HMobStore.createScanner(HMobStore.java:159)
at org.apache.hadoop.hbase.regionserver.HStore.getScanner(HStore.java:1943)
at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.initializeScanners(HRegion.java:6181)
at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.(HRegion.java:6161)
at org.apache.hadoop.hbase.regionserver.HRegion.instantiateRegionScanner(HRegion.java:2809)
参考答案:
java.net.SocketTimeoutException: read(2) error: Resource temporarily unavailable 这个错误通常表明在尝试从Hadoop的HDFS(Hadoop Distributed FileSystem)的DataNode读取数据时,客户端(在这个上下文中是Flink作业或HBase客户端)遇到了网络问题或HDFS DataNode无法及时响应读取请求。
这个问题可能由多种原因引起,以下是一些排查和解决问题的步骤:
网络问题:
检查Flink作业所在的机器和HDFS DataNode之间的网络连接。
确保没有防火墙或网络ACLs阻止它们之间的通信。
尝试ping DataNode的IP地址,看是否有丢包或延迟。
HDFS性能问题:
检查HDFS集群的健康状况和性能指标,如磁盘I/O、CPU和内存使用情况。
查看DataNode的日志,看是否有相关错误或警告。
如果HDFS集群负载很高,可能需要增加资源或优化作业以减少对HDFS的压力。
HBase和HDFS配置:
确保HBase和HDFS的配置文件(如hbase-site.xml和hdfs-site.xml)中的设置是合理的,特别是与I/O、超时和重试相关的设置。
检查是否有任何自定义配置可能导致此问题。
Flink配置:
查看Flink作业的配置,特别是与HDFS和HBase交互的部分。
尝试调整与超时相关的配置,如taskmanager.network.timeout。
HBase短路读取(Short-circuit Reads):
短路读取是HBase为了优化读取性能而引入的一个特性,允许HBase客户端直接从DataNode读取数据,而不需要经过NameNode。
如果短路读取配置不当或存在兼容性问题,可能会导致上述错误。检查HBase和HDFS的版本兼容性,并确认短路读取的配置是否正确。
HDFS DataNode问题:
如果某个DataNode出现问题(如磁盘故障、内存不足等),它可能无法及时响应读取请求。
使用Hadoop的管理工具(如Hadoop Admin UI或命令行工具)来检查DataNode的健康状况和状态。
如果需要,重新启动有问题的DataNode或整个HDFS集群。
日志分析:
仔细分析Flink、HBase和HDFS的日志文件,以获取更多关于错误的详细信息。
注意任何与网络连接、I/O错误、超时或重试相关的日志条目。
资源竞争:
如果集群中的多个作业同时运行并争用相同的资源(如CPU、内存、磁盘I/O或网络带宽),可能会导致性能下降和超时错误。
尝试调整作业的执行计划或资源分配,以减少资源竞争。
升级和补丁:
确保你正在使用的Flink、HBase和HDFS的版本都是最新的,或者已经应用了所有相关的安全补丁和性能改进。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/630117
问题五:flink cdc采集oracle 的adg备份库报错,这种情况如何解决?
flink cdc采集oracle 的adg备份库报错,这种情况如何解决?
参考答案:
如下配置:
Redo日志配置问题:确保ADG备份库正确配置了Redo日志,并且这些日志文件可以被Debezium访问。Oracle ADG应处于归档模式,否则可能无法正确捕获变更事件。同时,检查网络连接是否正常 。
Debezium连接器配置问题:确认使用的Debezium版本支持Oracle ADG并且兼容您的Oracle版本。检查Debezium连接器的配置,特别是与Oracle ADG相关的参数,例如server.id、database.history、database.history.fs.dir等,确保Debezium能够正确读取ADG备份库的日志文件路径 。
Flink CDC配置问题:检查Flink CDC的配置,包括连接字符串、用户名、密码等信息是否正确。确保Flink CDC配置支持DDL事件,因为这些事件可能会影响数据同步。如果需要过滤特定的表或模式,确保配置正确
具体还需要你逐步的看嘞
关于本问题的更多回答可点击进行查看: