问题一:flink侧输出算子堵住的问题
问题描述:(兄弟部门的问题,不方便截图,请求原谅~~) 1. flink版本: 1.6.0 2. 基本流程:flink读取kafka数据 -> json解析->(process并行度6)往下游11条pipeline发送数据 3. 问题现象: 运行一段时间后,该任务堵住,sink端无数据产生 4. 监控信息: 任务在map->sideprocess算子处出现反压,下游window->sink未出现反压。 map->sideprocess算子task metrics的outputBufferPool偶尔变成1,绝大时间处于0 目前感觉,process(并行度6) ->侧路输出到下游(11条分支), 这种场景下侧路输出是否支持?*来自志愿者整理的flink邮件归档
参考答案:
具体DAG描述不够清晰,看不出啥问题。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371582?spm=a2c6h.13066369.question.62.6ad26382n7OShf
问题二:KeyBy如何映射到物理分区
Hi,请教下各位: 我的场景是现在有个Keyby操作,但是我需要指定某一个key落地在某一个具体物理分区中。 我注意到keyby中得KeySelector仅仅是逻辑的分区,其实还是通过hash的方式来物理分区,没有办法指定哪一个key到哪一个分区去做。 我尝试使用partitionCustom中带有partitioner和keySelector的参数函数,但是发现没有办法直接使用类似Sum一类的聚合函数,实际测试发现Sum会将同一物理分区、但是不同Key的值都累加起来。 例如Tuple2<id,time>,id=1/2/3的给分区0,id=4的给分区1,直接使用sum的话,会将id=1/2/3的time都累加起来。 有什么方法能让keyby方法也能够物理分区吗?还是只能在partitionCustom后给map算子加逻辑使得累加操作正确。*来自志愿者整理的flink邮件归档
参考答案:
自定义分区可以的哈。
你说123都加起来那个不够具体,我猜你是直接用DataStream的sum,自然是全局sum。
分key的聚合必须是keyBy(...).sum(...)这样。
但是,partitionCustom返回的是DataStream。而如果继续keyBy会覆盖partitioner。
所以,你需要自己组装下transformation,也不难的。点进去看看源码怎么组装,抄一抄就可以。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371584?spm=a2c6h.13066369.question.63.6ad26382qxJrCT
问题三:SQL从1.9迁移到1.11的问题
我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口:
1. sqlQuery sqlUpdate: 执行表的创建、查找和写入
1. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new
RichSinkFunction )写入
1. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表
最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者
sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。
看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教:
1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute()
来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate
更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke
的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv
的思路,这种情况怎么实现更加合理?
1. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table
操作流程的 DAG,应该通过 tableEnv 的哪个接口获取?
1.
2.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
3.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-queryc*来自志愿者整理的flink邮件归档
参考答案:
通过 Table 操作流程的 DAG 现在不再会缓存到底层的 exec env 中,为了避免 transformations
污染,所以是拿不到的,但是内部代码我们仍然是先拼接 StreamGraph 然后直接通过 exec env 提交。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371587?spm=a2c6h.13066369.question.64.6ad26382csoS2S
问题四:flink cdc 当mysql表字段修改之后提示没有找到这个字段
当mysql表字段修改之后,再用flink cdc接入,当使用到这个表的时候会提示字段不存在。 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.RegularAnalysis.main(RegularAnalysis.java:234) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3312) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3302) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'rounds' not found in table 'prcrs' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 49 more*来自志愿者整理的flink邮件归档
参考答案:
Flink SQL 是结构化的 query 语言,目前做不到 schema 自动变更。 所以如果你的 mysql 源的 schema 变更了,那么需要重新定义 Flink DDL,然后重启 Flink SQL 的作业。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371590?spm=a2c6h.13066369.question.65.6ad26382QmhP9G
问题五:Flink SQL传递性
Flink SQL有没有上一个SQL的输出是下一个SQL的输入的业务场景思路?
比如说KafkaSource -> SQL_1 -> SQL_2 -> MysqlSink,一整个链起来,作为一个任务提交运行~*来自志愿者整理的flink邮件归档
参考答案:
创建 view ?试试看*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371592?spm=a2c6h.13066369.question.68.6ad26382XdbZXY