Flink转换问题之DataStream转成table失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:请教大神们关于flink-sql中数据赋值问题

看了官网的示例,发现sql中传入的值都是固定的,我有一个场景是从kafka消息队列接收查询条件,然后通过flink-sql映射hbase表进行查询并写入结果表。我使用了将消息队列映射表再join数据表的方式,回想一下这种方式很不妥,有什么好的方法实现sql入参的动态查询呢?*来自志愿者整理的flink邮件归档



参考答案:

自定义 UDF 可以解决你的问题吗? 比如 接收 kakfa 的数据字段定义成 hbaseQuery,然后自定义 UDF 去根据 query 查询数据。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371605?spm=a2c6h.13066369.question.72.6ad26382lmSgJW



问题二:flink savepoint

本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。

flink 版本1.10.1

执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 hdfs://hadoopnamenodeHA/flink/flink-savepoints

出现错误信息

org.apache.flink.util.FlinkException: Triggering a savepoint for the job a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.

at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)

at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)

at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)*来自志愿者整理的flink邮件归档



参考答案:

这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, 具体的原因需要看下 Jobmaster 的日志。 PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371606?spm=a2c6h.13066369.question.75.6ad26382XXH3Ol



问题三:关于cluster.evenly-spread-out-slots参数的底层原理

有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。

我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。

standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于这种分配规则,有没有一种方法让Flink做到

完全均衡,而不是尽可能均衡

此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share

group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(假设并发设置合理)。*来自志愿者整理的flink邮件归档



参考答案:

我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。

cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方:

  1. JobMaster 的 Scheduler 组件
  2. ResourceManager 的 SlotManager 组件

对于 JobMaster 中的 Scheduler,

它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。

Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上,

因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合,

然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是:

  1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID

是相同的(对于standalone模式可以不考虑该维度)

  1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的
  2. 候选slot所在的 TaskManager 目前的资源占用率

只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。

打分之后会选择得分最高的 slot 分配给当前的 exection vertex。

需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的

slot 计算出的,

(因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。

而不是指实际的CPU内存等资源利用率。

对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的

ResourceManager),

由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向

ResourceManager 的 SlotManager 组件申请。

如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。

如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。

这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。

最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的,

但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371608?spm=a2c6h.13066369.question.74.6ad26382bJV2or



问题四:flink 1.11 cdc: 如何将DataStream<RowData> 要如何转

目前有两个DataStream 的流,通过mapfunction, 转成DataStream 流,请问DataStream 怎么转成table,并使用flink sql进行操作。 (注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)

目前我的做法会报错:

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

DataStreamSource json1 // canal json的格式 DataStreamSource json2 // canal json的格式 ConnectedStreams<String, String> connect= caliber_cdc_json.connect(caliber_snapshot_json); //connect DataStream snapshot_cdc_stream = connect.flatMap( new SnapshotCdcCoRichFlatMapFunction() ); //做连接

//3, 注册表,将表数据,直接输出 Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" + "id INT,\n" + "name VARCHAR(255),\n" + "time TIMESTAMP(3),\n" + "PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")";

//4, app logic String op = "INSERT into test_mirror SELECT * from test"; fsTableEnv.executeSql(output); fsTableEnv.executeSql(op);

但提交任务失败,错误信息: serializationSchema:root |-- id: INT NOT NULL |-- name: VARCHAR(255) |-- time: TIMESTAMP(3) |-- status: INT |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0 +----------------+ | table name | +----------------+ | UnnamedTable$0 | | test | | test_mirror | +----------------+ 3 rows in set


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown Source) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 9 more

请问是啥原因?需要怎么做?*来自志愿者整理的flink邮件归档



参考答案:

  1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
  2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。

对于你的场景,有以下几种解决办法: 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用 Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371610?spm=a2c6h.13066369.question.75.6ad26382iS6iUL



问题五:flink1.11的cdc功能对消息顺序性的处理

麻烦请教下各位大神,flink如何处理如下问题:

flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。

如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。

假如

1.有源表和目标表:

create table test(

id int(10) primary key

)

2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。

3.发往的topic下有三个partition:p0、p1、p2

4.源端和目标端都有一条记录id=1

此时对源端进行两次update:

update1:update test set id=2 where id=1;

update2: update test set id=3 wehre id=2;

假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。*来自志愿者整理的flink邮件归档



参考答案:

可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q&A 中描述了 "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink 读取的时候才能保证顺序。"

个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?

[1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371611?spm=a2c6h.13066369.question.78.6ad26382nJnm13

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
3月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
3月前
|
JSON 关系型数据库 MySQL
这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
【1月更文挑战第17天】【1月更文挑战第84篇】这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
34 1
|
3月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
36 0
|
8月前
|
存储 API 流计算
Flink DataStream API-概念、模式、作业流程和程序
前几篇介绍了Flink的入门、架构原理、安装等,相信你对Flink已经了解入门。接下来开始介绍Flink DataStream API内容,先介绍DataStream API基本概念和使用,然后介绍核心概念,最后再介绍经典案例和代码实现。本篇内容:Flink DataStream API的概念、模式、作业流程和程序。
Flink DataStream API-概念、模式、作业流程和程序
|
4月前
|
SQL 传感器 分布式计算
Flink(五)【DataStream 转换算子(上)】
Flink(五)【DataStream 转换算子(上)】
|
4月前
|
消息中间件 Java Kafka
Flink(四)【DataStream API - Source算子】
Flink(四)【DataStream API - Source算子】
|
8月前
|
消息中间件 存储 分布式计算
Flink之DataStream API(执行环境、数据源、读取kafka)
Flink之DataStream API(执行环境、数据源、读取kafka)
922 0
|
存储 SQL 分布式计算
Flink Table Store 独立孵化启动 ,Apache Paimon 诞生
2023 年 3 月 12 日,Flink Table Store 项目顺利通过投票,正式进入 Apache 软件基金会 (ASF) 的孵化器,改名为 Apache Paimon (incubating)。
Flink Table Store 独立孵化启动 ,Apache Paimon 诞生
|
存储 消息中间件 SQL
Flink Table Store 0.3 构建流式数仓最佳实践
阿里巴巴高级技术专家,Apache Flink PMC 李劲松(之信),在 FFA 2022 实时湖仓的分享。
Flink Table Store 0.3 构建流式数仓最佳实践

相关产品

  • 实时计算 Flink版