开发者社区> 问答> 正文

关于 flink addSink(new MySink ) 问题咨询 #Flink

关于 flink addSink(new MySink ) 问题咨询

val resDs = ds .map(new DwRichMapFunction) .keyBy(_.tableName) .timeWindowAll(Time.seconds(5)) .process(new DwProcessAllWindowFunction) .addSink(new MySink)

class MySink extends RichSinkFunction[String] {

override def open(parameters: Configuration): Unit = {   println("===========sink open ===========") }
override def invoke(elements: List[String], context: SinkFunction.Context[_]): Unit = {}
override def close(): Unit = {}

}

问题一: 为什么在启动的时候 会进入 MySink 的open 4次? 问题二: 启动后的每一个window 会不会再次进入 MySink的 open 方法, 目测现在日志打印好像没有; 只进入 invoke 方法 #Flink

展开
收起
黄一刀 2021-01-24 09:06:12 1283 0
1 条回答
写回答
取消 提交回答
  • 1,看你sink并行度。2,open应该是只会打开一次

    2021-01-24 09:07:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载