flink问题之cep超时事件如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 双流join报错,java.lang.AssertionError


我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?

select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance from ( select toLong(behaviorTime, true) as recvTime, user_id, cast(regexp_extract(btnTitle, 'zbid={([^|])}', 1) as int) as live_id, regexp_extract(btnTitle, 'fundname={([^|])}', 1) as fund_name, regexp_extract(btnTitle, 'fundcode={([^|]*)}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream where pageId = 'xxxx' and eventId = 'click' and btnId = 'xxxx and CHARACTER_LENGTH(user_id) > 4 ) A left join ( select customerNumber, balance, fundCode, lastUpdateTime, proctime from lscsp_sc_order_all where status = '4' and businessType IN ('4','5','14','16','17','18') and fundCode IS NOT NULL and balance IS NOT NULL and lastUpdateTime IS NOT NULL ) B on A.user_id = B.customerNumber and A.fund_code = B.fundCode group by A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)

Exception in thread "main" java.lang.AssertionError at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448) at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737) at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656) at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522) at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)


参考回答:

我在1.10版本中确实触发到了这个bug,切到1.11版本貌似就没这问题了。简单解释下问题:双流join的case,右边流join后的结果字段在获取时貌似乱序了。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372136


问题二:自定义的sql connector在sql-cli中运行问题


我自定义了一个sql connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下 2020-07-14 10:36:29,148 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] Caused by: scala.MatchError: null at org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.Option.map(Option.scala:146) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341) ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:689) ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 9 more flink版本1.10.1 blink planner 测试的sql为: Flink SQL> CREATE TABLE prometheus_table (value DOUBLE )WITH ('connector.type' = 'prometheus','connector.job' = 'testJob','connector.metrics' = 'testMetrics','connector.address' = 'localhost:9091'); Flink SQL> insert into prometheus_table select cast(100.01 as double) as value; 看报错的地方应该是 def inferSinkPhysicalSchema( queryLogicalType: RowType, sink: TableSink[]): TableSchema = { val withChangeFlag = sink match { case : RetractStreamTableSink[] | : UpsertStreamTableSink[] => true case : StreamTableSink[] => false case dsts: DataStreamTableSink[] => dsts.withChangeFlag } inferSinkPhysicalSchema(sink.getConsumedDataType, queryLogicalType, withChangeFlag) } sink没有match到,但是我的tablesink是实现了AppendStreamTableSink的 想远程debug调试一下,按照网上的方法[1]也没成功

大佬们有没有什么思路指导一下。感谢

[1]https://blog.csdn.net/xianzhen376/article/details/80117637 https://blog.csdn.net/xianzhen376/article/details/80117637


参考回答:

解决了,原因是我同时实现了createTableSink和createStreamTableSink导致 删掉createTableSink就可以了


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372139


问题三:flink-1.11 DDL 设置chk目录问题


目前我只会设置streameEnv.setStateBackend(new FsStateBackend(checkpointPath));

但是DDL时候应该如何设置呢?

tableEnv.getConfig().getConfiguration().set(

ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(

ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));


参考回答:

没有太理解在DDL中设置,TableConfig上也可以设置 StreamEexecutionEnvironment 的 配置,你要的是这个吗?

tableEnv.getConfig().getConfiguration().set(CHECKPOINTS_DIRECTORY, "your-cp-path");


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372143


问题四:不能实时读取实时写入到 Hive 的数据


试验了一下 Flink-1.11 hive streaming 的功能

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html

创建 kafka 表,通过 SQL 实时写入 Hive.

但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。


参考回答:

你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372148


问题五:flink cep 如何处理超时事件?


想请教下各位。

我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。

但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。

想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。


参考回答:

flink使用event time,然后类似下面这样可以吗?

Pattern.begin("a").next("b").within(Time.minutes(1));


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372151

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
65 0
|
11天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
34 9
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
41 0
|
1月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
53 0
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
44 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
16天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
690 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。

相关产品

  • 实时计算 Flink版