Flink CDC这边分别测试mongo-cdc和postgres-cdc从固定保存的某个checkpointing重启执行,比如第一次消费完某个表的数据,第二次启动理论上应该拿不到数据,测试情况:postgres运行正常,符合预期,但是mongo每次启动都是获取的全量数据,能帮忙看下么?代码几乎一样,上边的是pg下边的是mongo
DebeziumDeserializationSchema deserializer =
new JsonDebeziumDeserializationSchema();
JdbcIncrementalSource<String> postgresIncrementalSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("**********")
.port(3433)
.database("bushu2")
.schemaList("schema_test")
.tableList("schema_test.materialized_view_table_test")
.username("postgres")
.password("test")
.slotName("flink")
.decodingPluginName("wal2json") // use pgoutput for PostgreSQL 10+
.deserializer(deserializer)
.includeSchemaChanges(true) // output the schema changes as well
.splitSize(2) // the split size of each snapshot split
.build();
Configuration conf = new Configuration();
conf.setInteger(HEARTBEAT_TIMEOUT.key(), 60 * 1000 * 60);
String pointPath = "file:///Users/chuxiangfeng/git/api-aggergator/flink-extend/checkpoint-dir";
conf.setString("execution.savepoint.path", pointPath + "/fd72014d4c864993a2e5a9287b4a9c5d/chk-4");
conf.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "fd72014d4c864993a2e5a9287b4a9c5d");
conf.setInteger(RestOptions.PORT, 8088);
conf.setInteger("state.checkpoints.num-retained", 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(pointPath));
DataStreamSource ds = env.fromSource(
postgresIncrementalSource,
WatermarkStrategy.noWatermarks(),
"PostgresParallelSource")
.setParallelism(1);
ds.print();
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("*****************")
.setPort(5672)
.setVirtualHost("test")
.setPassword("rabbitmq")
.setUserName("rabbitmq")
.build();
ds.addSink(new RMQSink<String>(
connectionConfig, // config for the RabbitMQ connection
"stream_test", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema())); // use parallelism 1 for sink to keep message ordering
env.execute();
==============================================================================================
SourceFunction sourceFunction = MongoDBSource.builder()
.hosts("*")
.username("*")
.password("*")
.databaseList("ods_sse") // set captured database, support regex
.collectionList("ods_sse.*") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Configuration conf = new Configuration();
conf.setInteger(HEARTBEAT_TIMEOUT.key(), 60 * 1000 * 60);
String id = "fd72014d4c864993a2e5a9287b4a9c56";
String checkId = "24";
String pointPath = "/Users/chuxiangfeng/git/api-aggergator/flink-extend/checkpoint-dir";
conf.setString("execution.savepoint.path", "file://" + pointPath + "/" + id + "/chk-" + checkId);
// checkMedata(pointPath + "/" + id + "/chk-" + checkId + "/_metadata");
conf.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, id);
conf.setInteger(RestOptions.PORT, 8088);
conf.setInteger("state.checkpoints.num-retained", 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file://" + pointPath));
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("*****************")
.setPort(5672)
.setVirtualHost("test")
.setPassword("rabbitmq")
.setUserName("rabbitmq")
.build();
env.addSource(sourceFunction).addSink(new RMQSink<String>(
connectionConfig, // config for the RabbitMQ connection
"stream_test", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema())).setParallelism(1);
env.execute();
提供一些建议帮助你排查问题:
检查MongoDB的日志,看看是否有异常或者错误信息。
检查MongoDB的配置,特别是关于复制集和分片的配置,确保数据在正确的位置。
检查Flink的日志,看看是否有异常或者错误信息。
检查Flink的配置,特别是关于CDC的配置,确保Flink能够正确地从MongoDB中读取数据。
检查Debezium的日志,看看是否有异常或者错误信息。
检查Debezium的配置,特别是关于MongoDB的配置,确保Debezium能够正确地从MongoDB中读取数据。
检查你的代码,看看是否有遗漏的地方。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。