Flink部署问题之不支持SupportsFilterPushDown如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink 1.11版本LeaseRenewer为什么线程不释放?

在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread Dump发现有很多名为LeaseRenewer

的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?

Flink version: 1.11

State backend:filesystem

checkpoint interval: 60s

*来自志愿者整理的flink邮件归档



参考答案:

我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。

你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370067?spm=a2c6h.13066369.question.33.33bf585fDxwz5N



问题二:ScanTableSource 为什么不支持SupportsFilterPushDown的问题

由于业务需要,我想在flink1.11.2中自定义一个ScanTableSource, 并想实现SupportsFilterPushDown特性。

但是在摸索的过程中发现,以下代码确似乎禁止使用SupportsFilterPushDown特性。

val unsupportedAbilities = List( classOf[SupportsFilterPushDown], classOf[SupportsLimitPushDown], classOf[SupportsPartitionPushDown],

classOf[SupportsComputedColumnPushDown], classOf[SupportsWatermarkPushDown]) unsupportedAbilities.foreach { ability => if (ability.isAssignableFrom(tableSource.getClass)) { throw new UnsupportedOperationException("Currently, a DynamicTableSource with " + s"${ability.getSimpleName} ability is not supported.") } }

SupportsFilterPushDown的介绍,Enables to push down filters into a {@link ScanTableSource}.

这样的话,是不是有点矛盾呢?

我怎么能在ScanTableSource上实现push down filters?

以及实现了SupportsFilterPushDown的source有哪些?

望知道的大佬告知,感谢。

*来自志愿者整理的flink邮件归档



参考答案:

在1.11中,planner 并没有支持下表中的各种PushDown, 所以这里做了check,这是planner层面不支持的。在1.12里,planner层面已经支持了这些PushDown,所以这些check都没有了,用户可以自定义 connector 并实现各种PushDown,比如,1.12中已经支持了kafka source上的watermarkPushdown。

因此,有这类需求建议基于1.12开发。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370068?spm=a2c6h.13066369.question.34.33bf585fD0v31d



问题三:分组查询时,select的字段是否一定要都在group by中吗?

我用Flink SQL 建了一张表,主键也设置了,执行形如"select * from test_table group by 主键 " 会报Expression 'XXX' is not being group的错误,通常来说按主键group by的话不是可以确定唯一性的吗? 难道是因为建表语句中flink的主键约束模式只支持 NOT ENFROCED吗? 这里有点不太明白*来自志愿者整理的flink邮件归档



参考答案:

你的分析是对的,group by pk的query是可以优化到筛选全部列的,这可以是个优化点,只是flink 现在还没有做, 和 Flink pk的 NOT ENFORCED 并没有关系,NOT NEOFRCED是说Flink不持有数据,不像数据库持有数据可以在读取时做校验。 个人感觉这是个小的优化点,如果很急需可以考虑在社区开个issue.

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370069?spm=a2c6h.13066369.question.35.33bf585fG0Vnk6



问题四:flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交

flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交;

自己实现sink开启数据库事务,遇到错误回滚并抛出异常,是否可以实现数据精确一次

谢谢!*来自志愿者整理的flink邮件归档



参考答案:

写入关系型数据库是可以做到端到端的一致性的,默认是不支持的,需要实现两阶段提交,按照你的思路是可行的。另外社区也有人在做这个feature[1],已经有PR了,你可以参考,预计会在1.13里支持。 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370070?spm=a2c6h.13066369.question.36.33bf585f10Sul4



问题五:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存怎么处理?

最近刚刚尝试使用flink 1.9.1 的RocksDB做增量checkpoints;

在程序种设置:

val backend = new RocksDBStateBackend("hdfs://xx/", true) backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)

并用MapState保存中间状态;(中间状态大概10个G); 我启动程序时,给taskmanager设置了3G内存:“-ytm 3072m \”,但是我的程序每跑一段时间都会报出超出物理内存的错误:"is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory used; 6.2 GB of 14.6 TB virtual memory used"

我对此有点不解,RocksDB不是会定期把状态写到hdfs么?为什么内存占用会越来越大,最终被yarn kill掉呢?难道是我漏掉了什么参数配置?希望各位能指点迷津~谢谢大家

*来自志愿者整理的flink邮件归档



参考答案:

RocksDB只是将数据可以存储在磁盘上,Flink再周期性将磁盘上数据上传到HDFS,内存中还是有LSM的write buffer以及block cache,也还是需要使用内存的

建议升级Flink版本到1.10+,引入了managed memory功能,理论上对于内存控制是要好很多的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370071?spm=a2c6h.13066369.question.37.33bf585fsjy1II

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
113 3
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
149 0
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
5月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
5月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
5月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
5月前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
5月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免

相关产品

  • 实时计算 Flink版