开发者社区> 问答> 正文

在sink 执行notifyCheckpointComplete 方法时能否收到上游

在如下代码中: FCombine 执行snapshot collect 发送数据之后如果不执行sleep 则 FSubmit 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。 如果在 FCombine 执行snapshot collect 发送数据之后如果执行sleep, 在执行 notifyCheckpointComplete 方法时 则就可以收到 snapshot collect 发送的数据。 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行 notifyCheckpointComplete 之前就会收到 上游 snapshot collect 发送数据(因为 snapshot collect 在前,广播 barrier 在后,然后下游在收到了 barrier 才会执行 chekpoint 的相关方法,所以在执行相关方法前 上游 snapshot collect 发出的数据就应该已经到达了下游)。 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。

public class FlinkCheckpointTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment steamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); steamEnv.enableCheckpointing(1000L*2); steamEnv .addSource(new FSource()).setParallelism(4) .transform("开始事务", Types.STRING,new FStart()).setParallelism(1) .process(new FCombine()).name("事务预处理").setParallelism(4) .addSink(new FSubmit()).name("提交事务").setParallelism(1) ; steamEnv.execute("test"); }

static class FSource extends RichParallelSourceFunction { @Override public void run(SourceContext sourceContext) throws Exception { int I =0; while (true){ I = I + 1; sourceContext.collect("thread " + Thread.currentThread().getId() +"-" +I); Thread.sleep(1000); } } @Override public void cancel() {} }

static class FStart extends AbstractStreamOperator implements OneInputStreamOperator<String,String>{ volatile Long ckid = 0L; @Override public void processElement(StreamRecord streamRecord) throws Exception { log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid); output.collect(streamRecord); } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { log("开启事务: " + checkpointId); ckid = checkpointId; super.prepareSnapshotPreBarrier(checkpointId); } }

static class FCombine extends ProcessFunction<String,String> implements CheckpointedFunction { List ls = new ArrayList (); Collector collector =null; volatile Long ckid = 0L;

@Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { StringBuffer sb = new StringBuffer(); ls.forEach(x->{sb.append(x).append(";");}); log("批处理 " + functionSnapshotContext.getCheckpointId() + ": 时收到数据:" + sb.toString()); Thread.sleep(51000); collector.collect(sb.toString()); ls.clear(); Thread.sleep(51000); //Thread.sleep(20*1000); } @Override public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { } @Override public void processElement(String s, Context context, Collector out) throws Exception { if(StringUtils.isNotBlank(s)){ ls.add(s); } log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + ckid); if(collector ==null){ collector = out; } } }

static class FSubmit extends RichSinkFunction implements /* CheckpointedFunction,*/ CheckpointListener { List ls = new ArrayList (); volatile Long ckid = 0L; @Override public void notifyCheckpointComplete(long l) throws Exception { ckid = l; StringBuffer sb = new StringBuffer(); ls.forEach(x->{sb.append(x).append("||");}); log("submit checkpoint " + l + " over data:list size" + ls.size()+ "; detail" + sb.toString()); ls.clear(); } @Override public void invoke(String value, Context context) throws Exception { if(StringUtils.isNotBlank(value)){ ls.add(value); } log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + ckid); } } public static void log(String s){ String name = Thread.currentThread().getName(); System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())+":"+name + ":" + s); } }*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:33:51 1188 0
1 条回答
写回答
取消 提交回答
  • 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。*来自志愿者整理的flink

    2021-12-07 21:09:03
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载