开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC mongo每次启动都是获取的全量数据,能帮忙看下么?

Flink CDC这边分别测试mongo-cdc和postgres-cdc从固定保存的某个checkpointing重启执行,比如第一次消费完某个表的数据,第二次启动理论上应该拿不到数据,测试情况:postgres运行正常,符合预期,但是mongo每次启动都是获取的全量数据,能帮忙看下么?代码几乎一样,上边的是pg下边的是mongo
DebeziumDeserializationSchema deserializer =
new JsonDebeziumDeserializationSchema();

    JdbcIncrementalSource<String> postgresIncrementalSource =
            PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                    .hostname("**********")
                    .port(3433)
                    .database("bushu2")
                    .schemaList("schema_test")
                    .tableList("schema_test.materialized_view_table_test")
                    .username("postgres")
                    .password("test")
                    .slotName("flink")
                    .decodingPluginName("wal2json") // use pgoutput for PostgreSQL 10+
                    .deserializer(deserializer)
                    .includeSchemaChanges(true) // output the schema changes as well
                    .splitSize(2) // the split size of each snapshot split
                    .build();
    Configuration conf = new Configuration();
    conf.setInteger(HEARTBEAT_TIMEOUT.key(), 60 * 1000 * 60);
    String pointPath = "file:///Users/chuxiangfeng/git/api-aggergator/flink-extend/checkpoint-dir";
    conf.setString("execution.savepoint.path", pointPath + "/fd72014d4c864993a2e5a9287b4a9c5d/chk-4");
    conf.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "fd72014d4c864993a2e5a9287b4a9c5d");
    conf.setInteger(RestOptions.PORT, 8088);
    conf.setInteger("state.checkpoints.num-retained", 1);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


    env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
    CheckpointConfig config = env.getCheckpointConfig();
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(pointPath));

    DataStreamSource ds = env.fromSource(
                    postgresIncrementalSource,
                    WatermarkStrategy.noWatermarks(),
                    "PostgresParallelSource")
            .setParallelism(1);

    ds.print();
    final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("*****************")
            .setPort(5672)
            .setVirtualHost("test")
            .setPassword("rabbitmq")
            .setUserName("rabbitmq")
            .build();
    ds.addSink(new RMQSink<String>(
            connectionConfig,            // config for the RabbitMQ connection
            "stream_test",                 // name of the RabbitMQ queue to send messages to
            new SimpleStringSchema())); // use parallelism 1 for sink to keep message ordering
    env.execute();

==============================================================================================
SourceFunction sourceFunction = MongoDBSource.builder()
.hosts("*")
.username("*")
.password("*")
.databaseList("ods_sse") // set captured database, support regex
.collectionList("ods_sse.*") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

    Configuration conf = new Configuration();
    conf.setInteger(HEARTBEAT_TIMEOUT.key(), 60 * 1000 * 60);
    String id = "fd72014d4c864993a2e5a9287b4a9c56";
    String checkId = "24";
    String pointPath = "/Users/chuxiangfeng/git/api-aggergator/flink-extend/checkpoint-dir";

    conf.setString("execution.savepoint.path", "file://" + pointPath + "/" + id + "/chk-" + checkId);

// checkMedata(pointPath + "/" + id + "/chk-" + checkId + "/_metadata");

    conf.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, id);
    conf.setInteger(RestOptions.PORT, 8088);
    conf.setInteger("state.checkpoints.num-retained", 1);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

    env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file://" + pointPath));
    final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("*****************")
            .setPort(5672)
            .setVirtualHost("test")
            .setPassword("rabbitmq")
            .setUserName("rabbitmq")
            .build();
    env.addSource(sourceFunction).addSink(new RMQSink<String>(
            connectionConfig,            // config for the RabbitMQ connection
            "stream_test",                 // name of the RabbitMQ queue to send messages to
            new SimpleStringSchema())).setParallelism(1);

    env.execute();

展开
收起
cuicuicuic 2023-10-18 16:03:23 80 0
1 条回答
写回答
取消 提交回答
  • 提供一些建议帮助你排查问题:

    1. 检查MongoDB的日志,看看是否有异常或者错误信息。

    2. 检查MongoDB的配置,特别是关于复制集和分片的配置,确保数据在正确的位置。

    3. 检查Flink的日志,看看是否有异常或者错误信息。

    4. 检查Flink的配置,特别是关于CDC的配置,确保Flink能够正确地从MongoDB中读取数据。

    5. 检查Debezium的日志,看看是否有异常或者错误信息。

    6. 检查Debezium的配置,特别是关于MongoDB的配置,确保Debezium能够正确地从MongoDB中读取数据。

    7. 检查你的代码,看看是否有遗漏的地方。

    2023-10-19 14:31:27
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载