Flink报错问题之flink双流join报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:滑动窗口数据存储多份问题

由于第一次咨询,我不确定上一份邮件大家是否收到。 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide 份?

*来自志愿者整理的flink邮件归档



参考答案:

可以使用blink 的SQL,是通过pane 实现的,输出的时候才合并每个pane。参考`PanedWindowAssigner`

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370211?spm=a2c6h.12873639.article-detail.78.6f9243783Lv0fl



问题二:flink 双流join报错,java.lang.AssertionError

我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?

select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance from ( select toLong(behaviorTime, true) as recvTime, user_id, cast(regexp_extract(btnTitle, 'zbid={([^|])}', 1) as int) as live_id, regexp_extract(btnTitle, 'fundname={([^|])}', 1) as fund_name, regexp_extract(btnTitle, 'fundcode={([^|]*)}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream where pageId = 'xxxx' and eventId = 'click' and btnId = 'xxxx and CHARACTER_LENGTH(user_id) > 4 ) A left join ( select customerNumber, balance, fundCode, lastUpdateTime, proctime from lscsp_sc_order_all where status = '4' and businessType IN ('4','5','14','16','17','18') and fundCode IS NOT NULL and balance IS NOT NULL and lastUpdateTime IS NOT NULL ) B on A.user_id = B.customerNumber and A.fund_code = B.fundCode group by A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)

Exception in thread "main" java.lang.AssertionError at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448) at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737) at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656) at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522) at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)

*来自志愿者整理的flink邮件归档



参考答案:

我在1.10版本中确实触发到了这个bug,切到1.11版本貌似就没这问题了。简单解释下问题:双流join的case,右边流join后的结果字段在获取时貌似乱序了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370210?spm=a2c6h.12873639.article-detail.79.6f9243783Lv0fl



问题三:自定义的sql connector在sql-cli中运行问题

我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下 2020-07-14 10:36:29,148 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] Caused by: scala.MatchError: null at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.Option.map(Option.scala:146) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:689) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 9 more flink版本1.10.1 blink planner 测试的sql为: Flink SQL> CREATE TABLE prometheus_table (value DOUBLE )WITH ('connector.type' = 'prometheus','connector.job' = 'testJob','connector.metrics' = 'testMetrics','connector.address' = 'localhost:9091'); Flink SQL> insert into prometheus_table select cast(100.01 as double) as value; 看报错的地方应该是 def inferSinkPhysicalSchema( queryLogicalType: RowType, sink: TableSink[]): TableSchema = { val withChangeFlag = sink match { case : RetractStreamTableSink[] | : UpsertStreamTableSink[] => true case : StreamTableSink[] => false case dsts: DataStreamTableSink[] => dsts.withChangeFlag } inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType, withChangeFlag) } sink没有match到,但是我的tablesink是实现了AppendStreamTableSink的 想远程debug调试一下,按照网上的方法[1]也没成功

大佬们有没有什么思路指导一下。感谢

[1]https://blog.csdn.net/xianzhen376/article/details/80117637 https://blog.csdn.net/xianzhen376/article/details/80117637

*来自志愿者整理的flink邮件归档



参考答案:

解决了,原因是我同时实现了createTableSink和createStreamTableSink导致 删掉createTableSink就可以了*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370208?spm=a2c6h.12873639.article-detail.80.6f9243783Lv0fl



问题四:flink-1.11 DDL 设置chk目录问题

目前我只会设置streameEnv.setStateBackend(new FsStateBackend(checkpointPath));

但是DDL时候应该如何设置呢?

tableEnv.getConfig().getConfiguration().set(

ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(

ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));

*来自志愿者整理的flink邮件归档



参考答案:

没有太理解在DDL中设置,TableConfig上也可以设置 StreamEexecutionEnvironment 的 配置,你要的是这个吗?

tableEnv.getConfig().getConfiguration().set(CHECKPOINTS_DIRECTORY, "your-cp-path");*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370207?spm=a2c6h.12873639.article-detail.81.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
2月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
2月前
|
存储 监控 Oracle
实时计算 Flink版产品使用问题之如何解决双流Join导致的状态膨胀和资源压力问题
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
关系型数据库 Java Apache
Apache Flink 漫谈系列(09) - JOIN 算子
聊什么 在《Apache Flink 漫谈系列 - SQL概览》中我们介绍了JOIN算子的语义和基本的使用方式,介绍过程中大家发现Apache Flink在语法语义上是遵循ANSI-SQL标准的,那么再深思一下传统数据库为啥需要有JOIN算子呢?在实现原理上面Apache Flink内部实现和传统.
11563 0

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面