Flink报错问题之使用hive udf函数报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';

用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d

hive 行命令查询结果是 2020-11-23 00:55:00.000000000,3702187169,5541332531,2020-11-23 01:00:00.000000000,3702300836,5541513669 coltime=2020-11-23 00:55:00.000000000 , coltime0=2020-11-23 01:00:00.000000000,a!=c, b!=d

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?

*来自志愿者整理的flink邮件归档



参考答案:

自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370013?spm=a2c6h.13066369.question.86.33bf585fC0wkjv



问题二:Flink SQL Row里嵌套Array<Row>该如何用DDL定义?

[image: image.png]

如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

*来自志愿者整理的flink邮件归档



参考答案:

看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?

此外就是可以发下更完整的异常栈么?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370012?spm=a2c6h.13066369.question.89.33bf585fG2uqpl



问题三:SQL Cli中找不到DeserializationSchemaFactory

flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  • name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

  • name: userId

data-type: STRING

  • name: province

data-type: STRING

  • name: city

data-type: STRING

  • name: age

data-type: INT

  • name: education

data-type: STRING

  • name: jobType

data-type: STRING

  • name: marriage

data-type: STRING

  • name: sex

data-type: STRING

  • name: interest

data-type: STRING

我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest

The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more

此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。

祝好!*来自志愿者整理的flink邮件归档



参考答案:

YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370011?spm=a2c6h.13066369.question.90.33bf585fm3uxtD



问题四:flink使用hive udf函数会报错

Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)

*来自志愿者整理的flink邮件归档



参考答案:

这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。

[1] https://issues.apache.org/jira/browse/FLINK-16688

[2] https://issues.apache.org/jira/browse/FLINK-16618

[3] https://issues.apache.org/jira/browse/FLINK-18995

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370010?spm=a2c6h.13066369.question.91.33bf585f5AUSSA



问题五:ProcessingTime下的watermark

使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time

attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]

我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

*来自志愿者整理的flink邮件归档



参考答案:

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364614?spm=a2c6h.13066369.question.90.33bf585fphmTPb

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 分布式计算 数据库
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
|
6月前
|
SQL 分布式计算 关系型数据库
【YashanDB知识库】hive初始化崖山报错YAS-04209
【YashanDB知识库】hive初始化崖山报错YAS-04209
|
6月前
|
SQL 分布式计算 数据库
【YashanDB知识库】Hive 命令工具insert崖山数据库报错
【YashanDB知识库】Hive 命令工具insert崖山数据库报错
|
6月前
|
SQL 分布式计算 关系型数据库
【YashanDB知识库】hive初始化崖山报错YAS-04209
【YashanDB知识库】hive初始化崖山报错YAS-04209
|
9月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
965 27
|
11月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
164 4
|
11月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
129 2
|
SQL JavaScript 前端开发
Hive根据用户自定义函数、reflect函数和窗口分析函数
Hive根据用户自定义函数、reflect函数和窗口分析函数
187 6
|
SQL Java 关系型数据库
Hive常见的报错信息
文章列举了Hive常见的几种报错信息,并提供了错误复现、原因分析以及相应的解决方案。
1333 1
|
11月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
130 0

相关产品

  • 实时计算 Flink版