开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

mongodb-cdc 2.4.1版本checkpoint一直是全量开始读取数据

每次启动读取全量数据同步到kafka,cancel job之后,再从checkpoint启动,能够正常启动,也显示从指定的checkpoint启动,但是数据是从头读取的。再次cancel job,对mongodb进行数据增删改,从第二个checkpoint启动,这时不是从头读取数据。
代码如下:
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MongoDBSource.Builder<String> mongoDBSource = MongoDBSource.<String>builder()
            .hosts(hostName)
            .username(userName)
            .password(password)
            .connectionOptions(connectionOptions)
            .deserializer(new CustomerMongoDeserialization()) //这里需要自定义序列化格式

// .deserializer(new JsonDebeziumDeserializationSchema())
;

    switch (tableFlag){
        case 1:
            mongoDBSource.databaseList(StringUtils.split(mongoDatabaseList, ","));
            break;
        case 2:
            if(StringUtils.isNotBlank(tableList)){
                mongoDBSource.collectionList(StringUtils.split(tableList, ",")); //这个注释,就是多表同步
            }else{
                throw new RuntimeException("多表同步没有表名");
            }
            break;

    }

    env.enableCheckpointing(flinkCheckPointInterval * 1000L);

    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    SourceFunction<String> sourceFunction = mongoDBSource.build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);



    //3.打印数据并将数据写入 Kafka
    if(StringUtils.isNotEmpty(printLog) && "true".equals(printLog)){
        streamSource.print();
    }


    streamSource.addSink(getKafkaProducer(kafkaIpPort, kafkaTopic, kafkaMaxRequestSize)).name("kafkaSink").disableChaining();


    setEnvRestartStrategy(env);

    env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


    //4.启动任务
    try {
        env.execute(flinkJobName);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

展开
收起
游客pbpcyd5spf2lq 2024-01-30 15:40:55 119 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
开源数据库 MongoDB 专场 MongoDB疑难杂症分析及优化 立即下载
阿里云MongoDB云服务构建 立即下载
饿了么高级架构师陈东明:MongoDB是如何逐步提高可靠性的 立即下载