Flink报错问题之用flush方法写入hbase报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:关于Catalog的建议

目前Flink提供memory、jdbc、hive这3种catalog。 感觉实际使用中,可以使用如下几种方案。 (1)选择memory catalog,然后每次sql都带上自己的相关DDL。 (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。

方案1和方案2各有优缺点。

方案1的优点: 比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的) 方案1的缺点: 很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。

-----然后,我的问题来了。 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。

当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。*来自志愿者整理的flink邮件归档



参考答案:

FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。

关于你的两个问题:

  1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog [1],文档也许是可以说的更明确一些。 2. 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364613?spm=a2c6h.13066369.question.93.33bf585ft9pCfh



问题二:Flink SQL 对延迟数据怎么处理?

请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?

*来自志愿者整理的flink邮件归档



参考答案:

据我所知,FlinkSQL 不支持将迟到的数据输出到侧流中。 如果你下游使用的是 window 的话,可以通过设置 table.exec.emit.late-fire.enabledtable.exec.emit.late-fire.delay 来触发晚于 watermark 到达的数据。 其中允许等待晚与 watermark 的数据的时间由 table.exec.state.ttl 控制,等价于 Datastream 中的 allowedLateness, 故 window 的最大等待时间为 watermark 的 outOfOrder + allowedLateness。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364611?spm=a2c6h.13066369.question.94.33bf585f61kRwL



问题三:关于flink实时写入hbase用flush方法频繁报操作超时问题

我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样: http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B71.png 但是我的任务会频繁报出如下错误: http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B7.png 感觉貌似是我代码的问题导致的,但又不知道原因,希望得到指导,感激不尽~ *来自志愿者整理的flink邮件归档



参考答案:

这个错误感觉是 Hbase 的错误。具体实现的话,你可以参考社区的 HBaseSinkFunction[1] 的实现。 [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364608?spm=a2c6h.13066369.question.95.33bf585fjetCy0



问题四:flink sql 中是否可以使用 mysql 的存储过程和函数?

需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样

select bin_to_uuid(id, true) as text_uuid from usertable

我尝试使用,报错说 bin_to_uuid 找不到*来自志愿者整理的flink邮件归档



参考答案:

不可以的,其中链接[1] 是Flink SQL 支持的所有内置函数,链接[2] 是 Flink SQL 允许自己定义函数,来满足个性化需求。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364606?spm=a2c6h.13066369.question.96.33bf585fSvUDKv



问题五:flink读mysql分库分表

flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?

*来自志愿者整理的flink邮件归档



参考答案:

我没理解错的话你是想一次读出所有表(分库分表)的所有数据, 用一个DDL建表语句搞定,目前还不支持

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364605?spm=a2c6h.13066369.question.97.33bf585fY84CGB

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
1月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
52 2
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2
|
1月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之用superset连接starrocks报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 增量问题之增量数据会报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决
|
1月前
|
Oracle 关系型数据库 数据处理
Flink CDC产品常见问题之flink postgresqlcdc 报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版