问题一:如何统计n小时内,flink成功从kafka消费的数据量?
Hi 各位,
我需要统计出flink最近 n小时(例如24小时?) 成功从kafka中消费的数据量,有什么比较好的方案吗?*来自志愿者整理的flink邮件归档
参考回答:
可以在metrics 上报时或落地前对source两次上报间隔的numRecordsOut值进行相减,最后呈现的时候按时间段累计就可以了
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359871
问题二:请教,有哪些连接器能让我获得一个非 update/delete,但又有主键的流呢?
背景:我想试用 flink sql 的 deduplicate 处理一个带主键的流,我发现
- 如果我使用 mysql-cdc 获得一个流,它会报错 Deduplicate doesn't support consuming update and delete changes....
- 如果我使用 kafka json 获得一个流,虽然 deduplicate 不报错,但是不能设置主键,报错 The Kafka table '...' with 'json' format doesn't support defining PRIMARY KEY constraint on the table
- 一个简单的满足上面要求的流是带主键的 datagen,它有主键,而且可以被 deduplicate 处理,但是太不可控。
请问还有别的连接器或格式支持吗,从文档中似乎不能很快地获得这一点。*来自志愿者整理的flink邮件归档
参考回答:
json格式改debezium-json试试
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359872
问题三:用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗?
dear all : 用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗? 每次都是提交执行的时候才报错。 理论上里面在SQL解析部分就能发现,有现成的API吗?还是说需要自己去抓那部分解析代码,然后封装出API。 如果没有,希望能提供这个功能,blink 应该是有的。*来自志愿者整理的flink邮件归档
参考回答:
可以看看 TableEnvironment#execute逻辑,如果能够将sql 转化成 Transformation
,那么语法应该没有问题。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370022
问题四:flinksql 1.12.1 row中字段访问报错怎么办
hi, all 定义一个 ScalarFunction class Test extends ScalarFunction{ @DataTypeHint("ROW") def eval(): Row ={ Row.of("a", "b", "c") } }
当执行下面语句的时候 select Test().a from taba1 会报下面的错误:
anonfun$build$1.apply(NestedProjectionUtil.scala:112) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtilanonfunbuildbuildbuild1.apply(NestedProjectionUtil.scala:111) at scala.collection.Iteratorclass.foreach(Iterator.scala:891)atscala.collection.AbstractIterator.foreach(Iterator.scala:1334)atscala.collection.IterableLikeclass.foreach(Iterator.scala:891)atscala.collection.AbstractIterator.foreach(Iterator.scala:1334)atscala.collection.IterableLikeclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala:111)atorg.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)atorg.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)atorg.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)atorg.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)atorg.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)atorg.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)atorg.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)atorg.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)atorg.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)atorg.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)atorg.apache.calcite.tools.Programs.build(NestedProjectionUtil.scala:111)atorg.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)atorg.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)atorg.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)atorg.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)atorg.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)atorg.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)atorg.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)atorg.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)atorg.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)atorg.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)atorg.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)atorg.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)atorg.apache.calcite.tools.Programs.build(NestedProjectionUtil.scala:111) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:309) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfun$optimize$1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfun$optimize$1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram
anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunoptimizeoptimizeoptimize1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfun$foldLeft$1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnceanonfun$foldLeft$1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce
anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunfoldLeftfoldLeftfoldLeft1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891)atscala.collection.AbstractIterator.foreach(Iterator.scala:1334)atscala.collection.IterableLikeclass.foreach(Iterator.scala:891)atscala.collection.AbstractIterator.foreach(Iterator.scala:1334)atscala.collection.IterableLikeclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1276) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161) ... 16 moreBest Regards.*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359483?spm=a2c6h.14164896.0.0.2b6b7302FZMqpW
问题五:Upsert Kafka 的format 为什么要求是INSERT-ONLY的?
Hi all, 最近在测试Upsert Kafka,在验证的过程中,发现Validation的时候要求format的changelog-mode 必须是insert-only的,请问这是什么原因呢。 如果不是的话,请直接指正我,谢谢。*来自志愿者整理的flink邮件归档
参考回答:
当初的设计是基于kafka的compacted topic设计的,而compacted
topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone
message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。
这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359365?spm=a2c6h.13262185.0.0.54e839c0D2mgIx