开发者社区> 问答> 正文

用flink sql实现row_Number时的问题

在使用flink sql的row_number时,order 的字段是必须带有时间属性的吗? 在" 实时计算 > Blink独享/共享集群(原产品线) > 最佳实践 > 视频直播行业最佳实践 > 视频直播解决方案之直播数字化运营"

这个示例中,
ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY app_room_user_cnt desc) AS ranking

这里的order by 并没有用时间属性的字段,是示例有误还是什么?

而我一个sql脚本在使用了时间属性之后

WATERMARK FOR pay_time AS withOffset(pay_time, 5000)

使用 row_number() over(partition by t1.s_id,t1.c_id order by t1.pay_time) s_rk

也直接报错了,报错信息如下

[error] null org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL statement. at org.apache.flink.table.client.gateway.athenax.AthenaxExecutor.explainJobGraphInternal(AthenaxExecutor.java:505) at org.apache.flink.table.client.gateway.athenax.AthenaxExecutor.explainJobGraph(AthenaxExecutor.java:464) at com.qingqing.streaming.compiler.CompilerExecutor.compileSql(CompilerExecutor.java:55) at com.qingqing.streaming.backend.service.JobService.checkSql(JobService.java:289) at com.qingqing.streaming.backend.controller.JobController.checkSql(JobController.java:118) at com.qingqing.streaming.backend.controller.JobController$$FastClassBySpringCGLIB$$b63c5ccc.invoke( ) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) at com.qingqing.streaming.backend.aspect.PermissionAspect.permissionProjectValidate(PermissionAspect.java:131) at sun.reflect.GeneratedMethodAccessor203.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:174) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) at com.qingqing.streaming.backend.controller.JobController$$EnhancerBySpringCGLIB$$7982e427.checkSql( ) at sun.reflect.GeneratedMethodAccessor234.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:800) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:884) at javax.servlet.http.HttpServlet.service(HttpServlet.java:661) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:858) at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:90) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117) at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:240) at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:240) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:234) at scala.collection.mutable.Builder$class.sizeHint(Builder.scala:69) at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:22) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:230) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at org.apache.flink.table.codegen.agg.DeclarativeAggCodeGen. (DeclarativeAggCodeGen.scala:69) at org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:183) at org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:175) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:175) at org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:285) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.scala:325) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:208) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) at org.apache.flink.table.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:52) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:42) at org.apache.flink.table.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:42) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translate(StreamExecSink.scala:155) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:100) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:52) at org.apache.flink.table.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:52) at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$translate(StreamTableEnvironment.scala:799) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$translate$1.apply(StreamTableEnvironment.scala:786) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$translate$1.apply(StreamTableEnvironment.scala:785) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:785) at org.apache.flink.table.api.TableEnvironment.compile(TableEnvironment.scala:160) at org.apache.flink.table.api.StreamTableEnvironment.compile(StreamTableEnvironment.scala:146) at org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:195) at org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:185) at org.apache.flink.table.client.gateway.athenax.ExecutorContext$EnvironmentInstance.createPlan(ExecutorContext.java:452) at org.apache.flink.table.client.gateway.athenax.ExecutorContext$EnvironmentInstance.createJobGraph(ExecutorContext.java:442) at org.apache.flink.table.client.gateway.athenax.AthenaxExecutor.lambda$explainJobGraphInternal$9(AthenaxExecutor.java:501) at org.apache.flink.table.client.gateway.athenax.ExecutorContext.wrapClassLoader(ExecutorContext.java:220) at org.apache.flink.table.client.gateway.athenax.AthenaxExecutor.explainJobGraphInternal(AthenaxExecutor.java:500) ... 83 more

请问是什么问题?

展开
收起
游客weio4zjhzgvv6 2020-05-22 17:46:25 2536 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载