问题一:用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值
sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 select p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d from snmpprobe.p_snmp_ifxtable as p0m inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and p0m.mibindex=p5m.mibindex where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 and p5m.ip='172.31.28.4' and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';
用flink sql client执行,计算结果是 coltime,a,c,coltime0, b,d 2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d
hive 行命令查询结果是 2020-11-23 00:55:00.000000000,3702187169,5541332531,2020-11-23 01:00:00.000000000,3702300836,5541513669 coltime=2020-11-23 00:55:00.000000000 , coltime0=2020-11-23 01:00:00.000000000,a!=c, b!=d
flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?
*来自志愿者整理的flink邮件归档
参考答案:
自己回答一下,供其他人参考。
换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 的一个bug,1.12应该已经改正了
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370013?spm=a2c6h.13066369.question.86.33bf585fC0wkjv
问题二:Flink SQL Row里嵌套Array<Row>该如何用DDL定义?
[image: image.png]
如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
*来自志愿者整理的flink邮件归档
参考答案:
看起来你的DDL写的没有什么问题。
你用的是哪个Flink版本呢?
此外就是可以发下更完整的异常栈么?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370012?spm=a2c6h.13066369.question.89.33bf585fG2uqpl
问题三:SQL Cli中找不到DeserializationSchemaFactory
flink版本1.12.0:
我想在sql-client-defaults.yaml中配置一张表,配置如下:
tables:
- name: t_users
type: source-table
connector:
property-version: 1
type: kafka
version: universal
topic: ods.userAnalysis.user_profile
startup-mode: latest-offset
properties:
bootstrap.servers: hostname:9092
group.id: flink-analysis
format:
type: debezium-avro-confluent
property-version: 1
debezium-avro-confluent.schema-registry.url: http://hostname:8081
#schema-registry.url: http://hostname:8081
schema:
- name: userId
data-type: STRING
- name: province
data-type: STRING
- name: city
data-type: STRING
- name: age
data-type: INT
- name: education
data-type: STRING
- name: jobType
data-type: STRING
- name: marriage
data-type: STRING
- name: sex
data-type: STRING
- name: interest
data-type: STRING
我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=henghe66:9092
connector.properties.group.id=flink-analysis
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=ods.userAnalysis.user_profile
connector.type=kafka
connector.version=universal
format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081
format.property-version=1
format.type=debezium-avro-confluent
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=userId
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=province
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=city
schema.3.data-type=INT
schema.3.name=age
schema.4.data-type=VARCHAR(2147483647)
schema.4.name=education
schema.5.data-type=VARCHAR(2147483647)
schema.5.name=jobType
schema.6.data-type=VARCHAR(2147483647)
schema.6.name=marriage
schema.7.data-type=VARCHAR(2147483647)
schema.7.name=sex
schema.8.data-type=VARCHAR(2147483647)
schema.8.name=interest
The following factories have been considered:
org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.formats.csv.CsvRowFormatFactory
org.apache.flink.formats.json.JsonRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:185)
at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:138)
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
... 3 more
此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。
所以难道是我在sql-client-defaults.yaml中配置错了吗?
请知道的大佬告知。
祝好!*来自志愿者整理的flink邮件归档
参考答案:
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370011?spm=a2c6h.13066369.question.90.33bf585fm3uxtD
问题四:flink使用hive udf函数会报错
Flink-1.11.1, hive-2.2.0 在使用current_timestamp或者current_date函数时会报 Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)
*来自志愿者整理的flink邮件归档
参考答案:
这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。
[1] https://issues.apache.org/jira/browse/FLINK-16688
[2] https://issues.apache.org/jira/browse/FLINK-16618
[3] https://issues.apache.org/jira/browse/FLINK-18995
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370010?spm=a2c6h.13066369.question.91.33bf585f5AUSSA
问题五:ProcessingTime下的watermark
使用flink1.11,在SQL ddl中基于process time声明watermark报错
SQL validation failed. Watermark can not be defined for a processing time
attribute column.
文档里关于watermark的解释也基本是跟eventTime在一起[1]
我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?
[1]
*来自志愿者整理的flink邮件归档
参考答案:
watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364614?spm=a2c6h.13066369.question.90.33bf585fphmTPb