Flink报错问题之SQL作业中调用UDTF报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:一个source多个sink的同步问题

source是kafka,有一个rowtime定义:

.field("rowtime", DataTypes.TIMESTAMP(0)) .rowtime(Rowtime() .timestamps_from_field("actionTime") .watermarks_periodic_bounded(60000) )

有两个sink,第一个sink是直接把kafa的数据保存到postgres。 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。 st_env.scan("source")

.window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))

.group_by("hourlywindow")

.select("udf(...)") ...

现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。

有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。

*来自志愿者整理的flink邮件归档



参考答案:

watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?

比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370149?spm=a2c6h.12873639.article-detail.108.6f9243783Lv0fl



问题二:Flink 1.11 SQL作业中调用UDTF 出现“No match found for func

本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:

@FunctionHint( input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")}, output = @DataTypeHint("STRING") ) public class Split extends TableFunction { public Split(){} public void eval(String str, String ch) { if (str == null || str.isEmpty()) { return; } else { String[] ss = str.split(ch); for (String s : ss) { collect(s); } } } }

在flink sql中通过 create function splitByChar as '..Split' 来创建这个function,在tableEnv 中调用executeSql(....) 来完成对这个 function的注册,在sql 后面的计算逻辑中 通过以下方式来调用这个UDTF create view view_source_1 as select dateTime,itime`, lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;

结果一直出现以下错误信息: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 25 to line 3, column 47: No match found for function signature splitByChar( , ) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629) .................... Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 25 to line 3, column 47: r( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature splitByChar( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)

之前在flink 1.10 里面使用是正常的, 问下各位大佬有没有在flink 1.11 遇到过这个错误, 麻烦提供一下帮助。 | |

*来自志愿者整理的flink邮件归档



参考答案:

我感觉这应该是新版本的udf的bug,我在本地也可以复现。 已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370147?spm=a2c6h.12873639.article-detail.109.6f9243783Lv0fl



问题三:flink-state问题

官网上讲这些 keyed state(ValueState<T>,ReducingState<T> ,ListState<T> ,AggregatingState<IN, OUT> ,MapState<UK, UV> )

支持在keyed stream中使用,言下之意就是只能在KeyedProcessFunction中使用?但是实际使用中,我在ProcessAllWindowFunction和ProcessWindowFunction也能使用上述这些State,这是什么原因?

*来自志愿者整理的flink邮件归档



参考答案:

keyed state 只能在 keyeed stream 中使用。ProcessAllWindowFunction 和

ProcessWindowFunction 这两个都是 Window 上的 function,window 已经是 keyed stream 了

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370145?spm=a2c6h.12873639.article-detail.110.6f9243783Lv0fl



问题四:flink kafka connector中获取kafka元数据

flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH

('connector.type' = 'kafka','connector.version' = '0.11' ,...))

*来自志愿者整理的flink邮件归档



参考答案:

kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370144?spm=a2c6h.12873639.article-detail.111.6f9243783Lv0fl



问题五:flink sql 读写写kafka表的时候可以指定消息的key吗

 flink sql 写kafka表的时候可以指定消息的key吗?

看官网的kafka connector没有找到消息key相关的说明

如果可以的话,如何指定?

谢谢

*来自志愿者整理的flink邮件归档



参考答案:

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。

祝好, Leonard Xu [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370142?spm=a2c6h.12873639.article-detail.112.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1577 27
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
675 14
|
11月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
212 0
|
12月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
412 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
267 9
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
169 6
|
存储 SQL C++
对比 SQL Server中的VARCHAR(max) 与VARCHAR(n) 数据类型
【7月更文挑战7天】SQL Server 中的 VARCHAR(max) vs VARCHAR(n): - VARCHAR(n) 存储最多 n 个字符(1-8000),适合短文本。 - VARCHAR(max) 可存储约 21 亿个字符,适合大量文本。 - VARCHAR(n) 在处理小数据时性能更好,空间固定。 - VARCHAR(max) 对于大文本更合适,但可能影响性能。 - 选择取决于数据长度预期和业务需求。
1003 1
|
SQL Oracle 关系型数据库
MySQL、SQL Server和Oracle数据库安装部署教程
数据库的安装部署教程因不同的数据库管理系统(DBMS)而异,以下将以MySQL、SQL Server和Oracle为例,分别概述其安装部署的基本步骤。请注意,由于软件版本和操作系统的不同,具体步骤可能会有所变化。
1057 3
|
SQL 存储 安全
数据库数据恢复—SQL Server数据库出现逻辑错误的数据恢复案例
SQL Server数据库数据恢复环境: 某品牌服务器存储中有两组raid5磁盘阵列。操作系统层面跑着SQL Server数据库,SQL Server数据库存放在D盘分区中。 SQL Server数据库故障: 存放SQL Server数据库的D盘分区容量不足,管理员在E盘中生成了一个.ndf的文件并且将数据库路径指向E盘继续使用。数据库继续运行一段时间后出现故障并报错,连接失效,SqlServer数据库无法附加查询。管理员多次尝试恢复数据库数据但是没有成功。

相关产品

  • 实时计算 Flink版