Flink报错问题之使用hive udf函数报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';

用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d

hive 行命令查询结果是 2020-11-23 00:55:00.000000000,3702187169,5541332531,2020-11-23 01:00:00.000000000,3702300836,5541513669 coltime=2020-11-23 00:55:00.000000000 , coltime0=2020-11-23 01:00:00.000000000,a!=c, b!=d

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?

*来自志愿者整理的flink邮件归档



参考答案:

自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370013?spm=a2c6h.13066369.question.86.33bf585fC0wkjv



问题二:Flink SQL Row里嵌套Array<Row>该如何用DDL定义?

[image: image.png]

如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

*来自志愿者整理的flink邮件归档



参考答案:

看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?

此外就是可以发下更完整的异常栈么?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370012?spm=a2c6h.13066369.question.89.33bf585fG2uqpl



问题三:SQL Cli中找不到DeserializationSchemaFactory

flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  • name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

  • name: userId

data-type: STRING

  • name: province

data-type: STRING

  • name: city

data-type: STRING

  • name: age

data-type: INT

  • name: education

data-type: STRING

  • name: jobType

data-type: STRING

  • name: marriage

data-type: STRING

  • name: sex

data-type: STRING

  • name: interest

data-type: STRING

我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest

The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more

此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。

祝好!*来自志愿者整理的flink邮件归档



参考答案:

YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370011?spm=a2c6h.13066369.question.90.33bf585fm3uxtD



问题四:flink使用hive udf函数会报错

Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)

*来自志愿者整理的flink邮件归档



参考答案:

这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。

[1] https://issues.apache.org/jira/browse/FLINK-16688

[2] https://issues.apache.org/jira/browse/FLINK-16618

[3] https://issues.apache.org/jira/browse/FLINK-18995

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370010?spm=a2c6h.13066369.question.91.33bf585f5AUSSA



问题五:ProcessingTime下的watermark

使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time

attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]

我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

*来自志愿者整理的flink邮件归档



参考答案:

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364614?spm=a2c6h.13066369.question.90.33bf585fphmTPb

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11天前
|
SQL 数据库 开发工具
实时计算 Flink版产品使用合集之数据库中有新增索引,同步任务没有报错,索引的变动是否有影响
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
11天前
|
Oracle NoSQL 关系型数据库
实时计算 Flink版操作报错之报错:java.lang.ClassNotFoundException: io.debezium.connector.common.RelationalBaseSourceConnector,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5天前
|
SQL HIVE
hive高频函数(一)
hive高频函数(一)
14 0
|
11天前
|
Java 关系型数据库 数据库连接
实时计算 Flink版操作报错之在使用JDBC连接MySQL数据库时遇到报错,识别不到jdbc了,怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错之连接读库时出现报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错之遇到报错“Metaspace out-of-memory error”是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错之在处理MySQL的DECIMAL类型时出现了报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5天前
|
SQL Java 程序员
Hive反射函数的使用-程序员是怎么学UDF函数的
Hive反射函数的使用-程序员是怎么学UDF函数的
4 0
|
5天前
|
SQL HIVE 数据格式
Hive高频函数(二)
Hive高频函数(二)
13 0
|
8天前
|
SQL 分布式计算 HIVE
Hive Cli / HiveServer2 中使用 dayofweek 函数引发的BUG!
在Hive 3.1.2和Spark 3.0.2集群环境中,遇到`dayofweek`函数bug。当`create_date`为字符串类型时,`dayofweek`函数结果错位。修复方法是将`create_date`转换为`date`类型。在Spark SQL中,原始代码能正常运行,未出现此问题。因此建议在Hive中使用转换后的日期类型以避免错误。

热门文章

最新文章

相关产品

  • 实时计算 Flink版