1、 maven大包涉及到长期的工程维护问题
现在官网提供的maven打包方式,直接把第三方包解开后按照目录方式存放 而不是维持maven depend on的标准的jar包方式(带版本) 现在这种方式不利于软件的项目长期管理,项目长期累月运行后, 随着人员变化以及版本升级,会带来很多版本兼容和识别的工程问题
期望flink在1.10.0的后续版本改进该问题,可能需要更改运行时的classloader
2、 维度数据通过RichXXXFunction的open重复加载,浪费存储空间的问题
假设启动一个任务并行度是1K,假设平均分配到10台计算主机计算,那么一个TaskManager会有100个slot,执行RichXXFunciton的open方法执行,那么同一台运算主机,就会有10个重复性的数据打开、加载浪费CPU和内存,希望能够做到在同一个TaskManger,按照job groupName或者jobID实现slot启动后预先加载数据,而不是slot所在的每个线程或者RichXXXFuction重复性的加载数据。Open通过本地的方式从启动预加载数据同步阻塞超时方式获取;
3、 TaskManager的在不同任务之间实现软资源隔离的问题
因业务代码写的可能有问题或者OOM,flink做不到像docker那样的资源隔离,但因多个不同的Job共用slot甚至TaskManager,会出现一个JOB出问题,搞挂一个计算节点的问题。如果能在JOBID级别对于RuntimeException尽心按照Job捕获打印异常,而不影响其他的Job,然后转成控制类的XXXEvent完成Job的生命周期的终结以及上报metrics;
4、 异常信息,特别是业务的异常信息,往往被flink框架给掩盖,或者带有checkpoint的failover时,会不断重试。希望把业务的异常信息像单机一样直接暴露,对于一些异常信息提供metrics上报,部分限制重试;
5、 研发过程中的小数据量的逻辑测试和现网超大数据量的逻辑往往不一致
比如双流join、甚至最简单的官网样例WordCount也会有这个问题,需要增强Mock仿真现网的实际
情况,否则带来的问题是更改逻辑,导致上次的savepoint不能再用;
6、 向业务方开放接口,回调方式监听处理过程,业务方干预checkpint和任务的完成
举例:比如我的数据是由一个JOB1独立消费清洗规整后sink落盘到HDFS,支持按大小和时间滚动文件。我的另外一个JOB2持续监听JOB1的sink的HDFS文件和目录当成source,通过实时处理后在sink到hdfs或者其他sink。
sourceFunction的无介入点:
A、 对于JOB2如何按照什么顺序消费HDFS文件我无法干预
B、 无法知道这个文件是否消费完成
C、 无法知道这个文件的消费进度
D、JOB Fail时无法干预 sinkFunction的无介入点:
A、 无法知道什么数据已经checkpoint
B、 如果JOB出现Fail和Restore因flink只对集群内部的state保证只执行一次,但对sink和souce目前缺乏有效的干预方式,因sink和source的差异无法做,为什么不开放给业务处理 Job的无接入点:
A、 因JOB是长期运行的,但业务的处理是由时间或者业务上的完成点。即需要回调由业务方判断业务已经阶段性完成,这些sink的数据可以使用,或者阶段性的终止JOB,而不是只有很粗暴的一种方式cancel job。
我现在只能一次次的刷新hdfs的sink判断是否数据是否无中间状态已经处理完毕,还要对带window的观察,webUI的Records Received和Records Send,因这些数据是在state中缓冲的webUI上看不到,需要等待StreamingFinkSink的RolloverInterval和InactivityInterval的时间过去后去判断业务数据是否处理完毕
7、 TODO
以上部分都是直接用DataStream遇到的问题和找不到观察点和介入接口*来自志愿者整理的FLINK邮件归档
虽然我不是 PMC,但对其中的一些问题我也想来谈一下自己的理解。
你说的是 shaded 的依赖吗?考虑到用户代码本身也可能依赖一些常见的库(例如 guava 等),为了防止和用户代码的版本冲突,Flink 才对常用的库进行了 shade,这样就相当于调用 Flink 自己的代码一样。这个机制正是为了解决版本兼容问题引入的。
open 应该只是进行数据源的连接操作,不同的 slot 处理的是不同的 input split,不会重复读取数据(但的确可能重复连接数据源)。对于 OLAP 场景这个的确是个优化点,但对于 batch / streaming 作业这个影响可能不是特别大。
session 集群的资源隔离是非常困难的,建议使用 per job 模式就能防止这个问题的出现。
这个可以举个例子吗?我理解你是想看历史的 failover 原因,这个在 web ui 里面就有,就在 root exception 的旁边。
无论数据量大小,作业的处理逻辑应该都是一致的。我不太明白这个问题...
我理解这个问题是想要一个 listener 的机制,使得作业运行到一定阶段以后来通知 listener。JM 主动通知 client 是不可行的,因为 client 可能在防火墙或者 NAT 网络里;不过现在有一个正在开发的 Operator Coordiantor 机制,之后会扩展设计,使得 client 可以对 coordinator 进行轮询,一部分程度上可以解决作业与 client 的沟通问题。见 https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface 。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。