Flink报错问题之使用hive udf函数报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';

用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d

hive 行命令查询结果是 2020-11-23 00:55:00.000000000,3702187169,5541332531,2020-11-23 01:00:00.000000000,3702300836,5541513669 coltime=2020-11-23 00:55:00.000000000 , coltime0=2020-11-23 01:00:00.000000000,a!=c, b!=d

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?

*来自志愿者整理的flink邮件归档



参考答案:

自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370013?spm=a2c6h.13066369.question.86.33bf585fC0wkjv



问题二:Flink SQL Row里嵌套Array<Row>该如何用DDL定义?

[image: image.png]

如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

*来自志愿者整理的flink邮件归档



参考答案:

看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?

此外就是可以发下更完整的异常栈么?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370012?spm=a2c6h.13066369.question.89.33bf585fG2uqpl



问题三:SQL Cli中找不到DeserializationSchemaFactory

flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  • name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

  • name: userId

data-type: STRING

  • name: province

data-type: STRING

  • name: city

data-type: STRING

  • name: age

data-type: INT

  • name: education

data-type: STRING

  • name: jobType

data-type: STRING

  • name: marriage

data-type: STRING

  • name: sex

data-type: STRING

  • name: interest

data-type: STRING

我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest

The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more

此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。

祝好!*来自志愿者整理的flink邮件归档



参考答案:

YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370011?spm=a2c6h.13066369.question.90.33bf585fm3uxtD



问题四:flink使用hive udf函数会报错

Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)

*来自志愿者整理的flink邮件归档



参考答案:

这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。

[1] https://issues.apache.org/jira/browse/FLINK-16688

[2] https://issues.apache.org/jira/browse/FLINK-16618

[3] https://issues.apache.org/jira/browse/FLINK-18995

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370010?spm=a2c6h.13066369.question.91.33bf585f5AUSSA



问题五:ProcessingTime下的watermark

使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time

attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]

我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

*来自志愿者整理的flink邮件归档



参考答案:

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364614?spm=a2c6h.13066369.question.90.33bf585fphmTPb

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
24天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
141 27
|
3月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
48 0
|
4月前
|
SQL Java 关系型数据库
Hive常见的报错信息
文章列举了Hive常见的几种报错信息,并提供了错误复现、原因分析以及相应的解决方案。
446 1
|
5月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
217 1
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
80 0
|
6月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。

相关产品

  • 实时计算 Flink版