关于 flink addSink(new MySink ) 问题咨询
val resDs = ds .map(new DwRichMapFunction) .keyBy(_.tableName) .timeWindowAll(Time.seconds(5)) .process(new DwProcessAllWindowFunction) .addSink(new MySink)
class MySink extends RichSinkFunction[String] {
override def open(parameters: Configuration): Unit = { println("===========sink open ===========") }
override def invoke(elements: List[String], context: SinkFunction.Context[_]): Unit = {}
override def close(): Unit = {}
}
问题一: 为什么在启动的时候 会进入 MySink 的open 4次? 问题二: 启动后的每一个window 会不会再次进入 MySink的 open 方法, 目测现在日志打印好像没有; 只进入 invoke 方法 #Flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。