Flink输出问题之flink侧输出算子堵住如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink侧输出算子堵住的问题

问题描述:(兄弟部门的问题,不方便截图,请求原谅~~) 1. flink版本: 1.6.0 2. 基本流程:flink读取kafka数据 -> json解析->(process并行度6)往下游11条pipeline发送数据 3. 问题现象: 运行一段时间后,该任务堵住,sink端无数据产生 4. 监控信息: 任务在map->sideprocess算子处出现反压,下游window->sink未出现反压。 map->sideprocess算子task metrics的outputBufferPool偶尔变成1,绝大时间处于0 目前感觉,process(并行度6) ->侧路输出到下游(11条分支), 这种场景下侧路输出是否支持?*来自志愿者整理的flink邮件归档



参考答案:

具体DAG描述不够清晰,看不出啥问题。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371582?spm=a2c6h.13066369.question.62.6ad26382n7OShf



问题二:KeyBy如何映射到物理分区

Hi,请教下各位: 我的场景是现在有个Keyby操作,但是我需要指定某一个key落地在某一个具体物理分区中。 我注意到keyby中得KeySelector仅仅是逻辑的分区,其实还是通过hash的方式来物理分区,没有办法指定哪一个key到哪一个分区去做。 我尝试使用partitionCustom中带有partitioner和keySelector的参数函数,但是发现没有办法直接使用类似Sum一类的聚合函数,实际测试发现Sum会将同一物理分区、但是不同Key的值都累加起来。 例如Tuple2<id,time>,id=1/2/3的给分区0,id=4的给分区1,直接使用sum的话,会将id=1/2/3的time都累加起来。 有什么方法能让keyby方法也能够物理分区吗?还是只能在partitionCustom后给map算子加逻辑使得累加操作正确。*来自志愿者整理的flink邮件归档



参考答案:

自定义分区可以的哈。

你说123都加起来那个不够具体,我猜你是直接用DataStream的sum,自然是全局sum。

分key的聚合必须是keyBy(...).sum(...)这样。

但是,partitionCustom返回的是DataStream。而如果继续keyBy会覆盖partitioner。

所以,你需要自己组装下transformation,也不难的。点进去看看源码怎么组装,抄一抄就可以。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371584?spm=a2c6h.13066369.question.63.6ad26382qxJrCT



问题三:SQL从1.9迁移到1.11的问题

我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口:

1. sqlQuery sqlUpdate: 执行表的创建、查找和写入

1. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new

RichSinkFunction )写入

1. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表

最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者

sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。

看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教:

1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute()

来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate

更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke

的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv

的思路,这种情况怎么实现更加合理?

1. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table

操作流程的 DAG,应该通过 tableEnv 的哪个接口获取?

1.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#corrected-execution-behavior-of-tableenvironmentexecute-and-streamtableenvironmentexecute-flink-16363

2.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

3.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-queryc*来自志愿者整理的flink邮件归档



参考答案:

通过 Table 操作流程的 DAG 现在不再会缓存到底层的 exec env 中,为了避免 transformations

污染,所以是拿不到的,但是内部代码我们仍然是先拼接 StreamGraph 然后直接通过 exec env 提交。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371587?spm=a2c6h.13066369.question.64.6ad26382csoS2S



问题四:flink cdc 当mysql表字段修改之后提示没有找到这个字段

当mysql表字段修改之后,再用flink cdc接入,当使用到这个表的时候会提示字段不存在。 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) at com.gaotu.data.performance.flink.job.sql.RegularAnalysis.main(RegularAnalysis.java:234) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 53, column 15 to line 53, column 20: Column 'rounds' not found in table 'prcrs' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:884) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6009) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3312) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3302) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'rounds' not found in table 'prcrs' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 49 more*来自志愿者整理的flink邮件归档



参考答案:

Flink SQL 是结构化的 query 语言,目前做不到 schema 自动变更。 所以如果你的 mysql 源的 schema 变更了,那么需要重新定义 Flink DDL,然后重启 Flink SQL 的作业。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371590?spm=a2c6h.13066369.question.65.6ad26382QmhP9G



问题五:Flink SQL传递性

Flink SQL有没有上一个SQL的输出是下一个SQL的输入的业务场景思路?

比如说KafkaSource -> SQL_1 -> SQL_2 -> MysqlSink,一整个链起来,作为一个任务提交运行~*来自志愿者整理的flink邮件归档



参考答案:

创建 view ?试试看*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371592?spm=a2c6h.13066369.question.68.6ad26382XdbZXY

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在Flink算子内部使用异步IO可以通过什么办法实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何在open算子中有办法获取到jobmanager的ip
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何将算子链断开
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
负载均衡 算法 大数据
[flink 实时流基础] 转换算子
[flink 实时流基础] 转换算子
103 2
|
8月前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
295 1
|
8月前
|
消息中间件 网络协议 Kafka
[flink 实时流基础] flink 源算子
[flink 实时流基础] flink 源算子
|
8月前
|
消息中间件 网络协议 大数据
[flink 实时流基础]源算子和转换算子
[flink 实时流基础]源算子和转换算子
|
消息中间件 缓存 资源调度
在 Flink 算子中使用多线程如何保证不丢数据?
本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。
在 Flink 算子中使用多线程如何保证不丢数据?
|
消息中间件 缓存 资源调度
在 Flink 算子中使用多线程如何保证不丢数据?
本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版