开发者社区> 问答> 正文

flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

作业SQL是这样的: create view 111 as select * from table1 where event_id = '0103002' and day='2020-05-13' and hour='13';

create view view_1 as select day, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid, a.country as country from 111 as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true;

CREATE VIEW view_6 as SELECT uid, refer_list, abflag, last_value(country) FROM view_1 where refer_list in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR') GROUP BY uid, refer_list, abflag;

提交的时候,出现异常信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)

查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下: def hashCodeForType( ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match { case BOOLEAN => s"${className[JBoolean]}.hashCode($term)" case TINYINT => s"${className[JByte]}.hashCode($term)" case SMALLINT => s"${className[JShort]}.hashCode($term)" case INTEGER => s"${className[JInt]}.hashCode($term)" case BIGINT => s"${className[JLong]}.hashCode($term)" case FLOAT => s"${className[JFloat]}.hashCode($term)" case DOUBLE => s"${className[JDouble]}.hashCode($term)" case VARCHAR | CHAR => s"$term.hashCode()" case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)" case DECIMAL => s"$term.hashCode()" case DATE => s"${className[JInt]}.hashCode($term)" case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)" case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$term.hashCode()" case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)" case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)" case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t") //case MAP => s"${className[BaseMap]}.getHashCode($term)" // 新加的一行代码 case ROW => val rowType = t.asInstanceOf[RowType]

作业可以正常提交了, 运行一段时间,又出现异常: java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:156)

*来自志愿者整理的flink邮件归档 at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) ... 8 more Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 10 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 13 more Caused by: org.codehaus.commons.compiler.CompileException: Line 459, Column 57: A method named "compareTo" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)*来自志愿者整理的FLINK邮件归档

展开
收起
毛毛虫雨 2021-12-05 06:23:48 1146 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载