Flink报错问题之flink 1.11指定rowtime字段报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink SQL处理Array型的JSON

Kafka中的JSON结构是个Array例子如下。

[

{ "id": 1},

{ "id": 2}

]

读出来变成表的两行。Flink SQL层面最佳实践是什么?

如果没有办法是不是只能改JSON结构了。

*来自志愿者整理的flink邮件归档



参考答案:

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370174?spm=a2c6h.12873639.article-detail.93.6f9243783Lv0fl



问题二:flink 同时sink hbase和hive,hbase少记录

flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条

query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( | rowkey VARCHAR, | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'ods:user_hbase6', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '1mb', | 'connector.write.buffer-flush.max-rows' = '1', | 'connector.write.buffer-flush.interval' = '0s' |) |""".stripMargin)

val statementSet = streamTableEnv.createStatementSet() val insertHbase = """ |insert into hbase_table |SELECT | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, | ROW(sex, age, created_time ) as cf |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) | |""".stripMargin

statementSet.addInsertSql(insertHbase)

val insertHive = """ | |INSERT INTO odsCatalog.ods.hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin statementSet.addInsertSql(insertHive)

statementSet.execute()

是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1

并且,按照官网文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html

设置参数也不识别,报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

看了一下源码, org.apache.flink.table.descriptors.HBaseValidator public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; 参数还是老参数

*来自志愿者整理的flink邮件归档



参考答案:

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1][1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370173?spm=a2c6h.12873639.article-detail.94.6f9243783Lv0fl



问题三:【Flink Join内存问题】

我看源码里写到JoinedStreams:

也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom

那么有什么预防措施呢?

将key值多的一边进行打散?

Right now, the join is being evaluated in memory so you need to ensure that the number

  • of elements per key does not get too high. Otherwise the JVM might crash.

*来自志愿者整理的flink邮件归档



参考答案:

egular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370172?spm=a2c6h.12873639.article-detail.95.6f9243783Lv0fl



问题四:flink 1.11 createTemporaryTable 指定 rowtime 字段报 Fie

使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable 为事件时间,程序包 Field null does not exist 错误,是我用法有问题? 看了下 https://issues.apache.org/jira/browse/FLINK-16160 https://issues.apache.org/jira/browse/FLINK-16160 这个 issue 是解决的这个问题吗?

tableEnv.connect(kafka) .withSchema( new Schema().field("searchTime", DataTypes.TIMESTAMP()).rowtime(rowtime); ) .withFormat( new Json().failOnMissingField(false) ) .createTemporaryTable("tablename");

*来自志愿者整理的flink邮件归档



参考答案:

Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用

executeSql(ddl) 来替代。

社区计划在 1.12 中系统地重构和修复 connect API 。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370169?spm=a2c6h.12873639.article-detail.96.6f9243783Lv0fl



问题五:单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及s

单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?请问下,有没有大佬做过类似的事情?

另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?

*来自志愿者整理的flink邮件归档



参考答案:

你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。

datastream.map(new MySQLSinkMapFunction()).addSink(new

FlinkKafkaProducer()).

也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。

另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370168?spm=a2c6h.12873639.article-detail.97.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
监控 关系型数据库 数据处理
实时计算 Flink版产品使用问题之遇到中文字段在读取时转换不当,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
5月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Oracle NoSQL
实时计算 Flink版操作报错合集之报错“找不到对应的归档日志文件”,怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版