Flink报错问题之使用hive udf函数报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';

用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d

hive 行命令查询结果是 2020-11-23 00:55:00.000000000,3702187169,5541332531,2020-11-23 01:00:00.000000000,3702300836,5541513669 coltime=2020-11-23 00:55:00.000000000 , coltime0=2020-11-23 01:00:00.000000000,a!=c, b!=d

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?

*来自志愿者整理的flink邮件归档



参考答案:

自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370013?spm=a2c6h.13066369.question.86.33bf585fC0wkjv



问题二:Flink SQL Row里嵌套Array<Row>该如何用DDL定义?

[image: image.png]

如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

*来自志愿者整理的flink邮件归档



参考答案:

看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?

此外就是可以发下更完整的异常栈么?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370012?spm=a2c6h.13066369.question.89.33bf585fG2uqpl



问题三:SQL Cli中找不到DeserializationSchemaFactory

flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  • name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

  • name: userId

data-type: STRING

  • name: province

data-type: STRING

  • name: city

data-type: STRING

  • name: age

data-type: INT

  • name: education

data-type: STRING

  • name: jobType

data-type: STRING

  • name: marriage

data-type: STRING

  • name: sex

data-type: STRING

  • name: interest

data-type: STRING

我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest

The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)

at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)

at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)

... 3 more

此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。

所以难道是我在sql-client-defaults.yaml中配置错了吗?

请知道的大佬告知。

祝好!*来自志愿者整理的flink邮件归档



参考答案:

YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370011?spm=a2c6h.13066369.question.90.33bf585fm3uxtD



问题四:flink使用hive udf函数会报错

Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)

*来自志愿者整理的flink邮件归档



参考答案:

这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。

[1] https://issues.apache.org/jira/browse/FLINK-16688

[2] https://issues.apache.org/jira/browse/FLINK-16618

[3] https://issues.apache.org/jira/browse/FLINK-18995

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370010?spm=a2c6h.13066369.question.91.33bf585f5AUSSA



问题五:ProcessingTime下的watermark

使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time

attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]

我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

*来自志愿者整理的flink邮件归档



参考答案:

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364614?spm=a2c6h.13066369.question.90.33bf585fphmTPb

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
1
1
0
1159
分享
相关文章
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
87 4
|
6月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
75 2
Hive根据用户自定义函数、reflect函数和窗口分析函数
Hive根据用户自定义函数、reflect函数和窗口分析函数
91 6
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
Flink UDF自动注册实践
1.注册UDF函数1.1 注册相关方法此处,我们使用的udf函数为标量函数,它继承的是ScalarFunction,该类在我们的使用中,发现它继承自UserDefinedFunction这个类,该处的udf函数由用户自己定义,而函数的注册此处我们自己实现; 函数注册时,使用flink的tableE.
2306 0
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2102 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
263 0
Flink CDC 在阿里云实时计算Flink版的云上实践
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
249 56

相关产品

  • 实时计算 Flink版
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等