这是报错
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
    at akka.dispatch.OnComplete.internal(Future.scala:300)
    at akka.dispatch.OnComplete.internal(Future.scala:297)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
    at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
    at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=60000)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    ... 5 more
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)
    at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.validateAndLoadDatabaseHistory(StatefulTaskContext.java:184)
    at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:128)
    at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:112)
    at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:76)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:114)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:80)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
这是我的配置代码
 protected Properties getDebeziumProperties(List fullTableNames) {
        Properties debeziumProps = new Properties();
    // ==================== 核心转换器和数据类型配置 ====================
    // 转换器配置
    debeziumProps.setProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter");
    debeziumProps.setProperty("value.converter.schemas.enable", "true");
    // 数据类型处理
    debeziumProps.setProperty("value.converter.debezium.data.date.handling.mode", "string");
    debeziumProps.setProperty("value.converter.debezium.data.datetime.handling.mode", "string");
    debeziumProps.setProperty("value.converter.debezium.data.timestamp.handling.mode", "string");
    debeziumProps.setProperty("decimal.handling.mode", "string");
    debeziumProps.setProperty("bigint.unsigned.handling.mode", "long");
    // 1. 指定历史记录的存储方式(文件存储)
    String historyFile = ensureHistoryFilePath(config.getDebeziumHistoryFile());
    debeziumProps.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
    debeziumProps.setProperty("database.history.file.filename", historyFile);
    debeziumProps.setProperty("database.history.file.auto.create", "true");
    debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", "true");  // 新增新参数
    debeziumProps.setProperty("logging.level.io.debezium.connector.mysql", "DEBUG");
    debeziumProps.setProperty("logging.level.io.debezium.relational.history", "DEBUG");
    // 新增:开启历史文件存储的DEBUG日志
    debeziumProps.setProperty("logging.level.io.debezium.relational.history.FileDatabaseHistory", "DEBUG");
    // 开启DDL处理的DEBUG日志(确认是否有DDL需要写入)
    debeziumProps.setProperty("logging.level.io.debezium.relational.HistorizedRelationalDatabaseSchema", "DEBUG");
    // 新增:强制历史文件即时写入,禁用缓冲(解决Windows下缓冲不刷新问题)
    debeziumProps.setProperty("database.history.file.flush.interval.ms", "1");  // 每1ms刷新一次缓冲
    debeziumProps.setProperty("database.history.file.buffer.size", "1");       // 缓冲大小设为1字节(强制即时写)
    // 新增:禁用DDL过滤,确保所有监控表的DDL都被写入(排除过滤逻辑导致的漏写)
    debeziumProps.setProperty("database.history.skip.unparseable.ddl", "false");  // 不跳过无法解析的DDL(默认false,显式配置更稳妥)
    // ==================== 表过滤配置 ====================
    List<String> fullTableList = new java.util.ArrayList<>();
    for (String tableName : fullTableNames) {
        fullTableList.add(config.getMysqlDatabase() + "." + tableName);
    }
    String tableIncludeList = String.join(",", fullTableList);
    debeziumProps.setProperty("table.include.list", tableIncludeList);
    // ==================== 性能优化配置 ====================
    debeziumProps.setProperty("max.batch.size", "1024");
    debeziumProps.setProperty("poll.interval.ms", "100");
    // ==================== 快照配置 ====================
    debeziumProps.setProperty("snapshot.mode", config.getSnapShotMode());
    debeziumProps.setProperty("snapshot.locking.mode", "none");
    // ==================== 快照配置(全量同步重点) ====================
    debeziumProps.setProperty("snapshot.fetch.size", "2000");
    // 全量同步专用优化
    debeziumProps.setProperty("snapshot.max.threads", "1");
    debeziumProps.setProperty("snapshot.select.statement.overrides", "");
    // ==================== 🔥 心跳管理和连接优化配置 ====================
    // 1. 心跳配置 - 保持连接活跃(关键配置)
    debeziumProps.setProperty("heartbeat.interval.ms", "10000");
    debeziumProps.setProperty("heartbeat.topics.prefix", "__debezium_heartbeat");
    // 2. 连接保活配置
    debeziumProps.setProperty("connect.keep.alive", "true");
    debeziumProps.setProperty("connect.keep.alive.interval.ms", "15000");
    // 3. 连接超时和重试配置
    debeziumProps.setProperty("connect.timeout.ms", "30000");
    debeziumProps.setProperty("connect.backoff.initial.delay.ms", "1000");
    debeziumProps.setProperty("connect.backoff.max.delay.ms", "60000");
    debeziumProps.setProperty("connect.max.attempts", "3");
    // 4. Binlog 读取和网络超时
    debeziumProps.setProperty("binlog.read.timeout.ms", "30000");
    debeziumProps.setProperty("gtid.source.filter.dml.events", "true");
    debeziumProps.setProperty("socket.timeout.ms", "60000");
    debeziumProps.setProperty("tcpKeepAlive", "true");
    // 5. 任务关闭和资源清理配置
    debeziumProps.setProperty("task.shutdown.graceful.timeout.ms", "30000");
    debeziumProps.setProperty("offset.flush.interval.ms", "60000");
    debeziumProps.setProperty("offset.flush.timeout.ms", "10000");
    // 6. 错误处理和容错配置
    debeziumProps.setProperty("errors.retry.delay.max.ms", "60000");
    debeziumProps.setProperty("errors.retry.timeout", "300000");
    debeziumProps.setProperty("errors.tolerance", "all");
    debeziumProps.setProperty("errors.log.enable", "true");
    debeziumProps.setProperty("errors.log.include.messages", "true");
    // 7. 监控和指标配置
    debeziumProps.setProperty("metrics.enabled", "true");
    debeziumProps.setProperty("metrics.registry.name", "flink-cdc-metrics");
    // 8. 内存和缓冲区优化
    debeziumProps.setProperty("buffer.memory.bytes", "67108864");
    debeziumProps.setProperty("batch.size", "1024");
    // 9. MySQL 特定优化
    debeziumProps.setProperty("database.connectionTimeZone", "Asia/Shanghai");
    debeziumProps.setProperty("database.sslMode", "disabled");
    debeziumProps.setProperty("database.allowPublicKeyRetrieval", "true");
    log.info("==================== Debezium 优化配置 ====================");
    debeziumProps.forEach((key, value) -> log.info("{} = {}", key, value));
    log.info("==========================================================");
    return debeziumProps;
}
private String ensureHistoryFilePath(String path) {
    File file = new File(path);
    // 如果是相对路径,转换为绝对路径
    if (!file.isAbsolute()) {
        String absolutePath = new File(file.getAbsolutePath()).getAbsolutePath();
        log.info("相对路径 {} 转换为绝对路径: {}", path, absolutePath);
        return absolutePath;
    }
    // 确保父目录存在
    File parent = file.getParentFile();
    if (!parent.exists()) {
        boolean created = parent.mkdirs();
        log.info("创建历史文件目录 {}: {}", parent.getAbsolutePath(), created ? "成功" : "失败");
    }
    if (file.exists()) {
        log.info("历史文件已存在,路径:{},大小:{}KB", file.getAbsolutePath(), file.length() / 1024);
    } else {
        log.info("历史文件尚未创建,路径:{}(将在快照阶段自动生成)", file.getAbsolutePath());
    }
    return file.getAbsolutePath();
}
其中 mode是initial
@Override
public MySqlSource<String> build(List<String> fullTableNames) {
    log.info("=== 启动统一CDC源,监听表:{} ===", String.join(", ", fullTableNames));
    Properties jdbcProps = getJdbcProperties();
    Properties debeziumProps = getDebeziumProperties(fullTableNames);
    StartupOptions startupOptions = determineStartupMode();
    return MySqlSource.<String>builder()
        .hostname(config.getMysqlHostname())
        .port(config.getMysqlPort())
        .databaseList(config.getMysqlDatabase())
        .tableList(fullTableNames.toArray(new String[0]))
        .username(config.getMysqlUsername())
        .password(config.getMysqlPassword())
        .serverId(generateServerIdRange())
        .serverTimeZone("Asia/Shanghai")
        .startupOptions(startupOptions)
        .fetchSize(1024)
        .connectionPoolSize(2)
        .deserializer(new JsonDebeziumDeserializationSchema())
        .jdbcProperties(jdbcProps)
        .debeziumProperties(debeziumProps)
        .includeSchemaChanges(true)
        .build();
}
CDC的配置是这样的 , 其中startupOptions也是initial
我可以正常读取对应数据,然后也可以读取到表结构实现数据的全量同步转增量同步,但是唯独却无法落盘
文件权限没有问题20:21:11.679 [main] INFO com.taooxi.aquila.fullySync.config.BasicConfig - 读取BASIC-JOB配置
20:21:11.679 [main] INFO com.taooxi.aquila.fullySync.config.BasicConfig - BASIC-JOB 配置: jobName=basic-job, debeziumHistoryFile=D:/taooxi/debezium/basic/debezium-db-history.dat, checkpointPath=D:\taooxi\checkpoint\basic, errorDataPath=D:\taooxi\error\basic, mysqlServerId=1000
20:21:11.680 [main] INFO com.taooxi.aquila.fullySync.config.SyncConfig - ✅ 成功从classpath加载配置:config/sync-db.properties
20:21:11.680 [main] INFO com.taooxi.aquila.fullySync.config.SyncConfig - ==================== 加载的配置信息 ====================
20:21:11.680 [main] INFO com.taooxi.aquila.fullySync.config.SyncConfig - Debezium配置:historyFile=D:/taooxi/debezium/basic/debezium-db-history.dat
20:21:11.680 [main] INFO com.taooxi.aquila.fullySync.config.SyncConfig - Flink作业配置:name=basic-job, parallelism=1, checkpointInterval=10000ms, checkpointPath=D:\taooxi\checkpoint\basic, errorDataPath=D:\taooxi\error\basic
20:21:11.680 [main] INFO com.taooxi.aquila.fullySync.config.SyncConfig - =======================================================
20:21:11.722 [main] INFO com.taooxi.aquila.fullySync.job.AbstractJob - === 同步表名列表:[customer] ===
20:21:11.937 [main] INFO com.taooxi.aquila.fullySync.util.env.FlinkEnvUtil - 配置checkpoint存储路径: file:///D:/taooxi/checkpoint/basic
20:21:11.979 [main] INFO com.taooxi.aquila.fullySync.util.env.FlinkEnvUtil - 使用RocksDB状态后端
20:21:11.994 [main] INFO com.taooxi.aquila.fullySync.util.env.FlinkEnvUtil - 状态TTL配置: 86400000ms (需要在各状态描述符中单独应用)
20:21:11.995 [main] INFO com.taooxi.aquila.fullySync.util.env.FlinkEnvUtil - ✅ 重启策略配置: 最大重启次数=3, 重启间隔=60000ms
20:21:11.996 [main] INFO com.taooxi.aquila.fullySync.util.env.FlinkEnvUtil - ✅ 性能优化配置: 水印间隔=1000ms, 对象重用=enabled
20:21:12.000 [main] INFO com.taooxi.aquila.fullySync.source.UnifiedCdcSourceBuilder - === 启动统一CDC源,监听表:customer ===
20:21:12.001 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - 历史文件已存在,路径:D:\taooxi\debezium\basic\debezium-db-history.dat,大小:0KB
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - ==================== Debezium 优化配置 ====================
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - snapshot.locking.mode = none
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - errors.log.include.messages = true
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - binlog.read.timeout.ms = 30000
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.file.flush.interval.ms = 1
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - metrics.registry.name = flink-cdc-metrics
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - logging.level.io.debezium.relational.history.FileDatabaseHistory = DEBUG
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - poll.interval.ms = 100
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.store.only.captured.tables.ddl = true
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - heartbeat.topics.prefix = __debezium_heartbeat
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - errors.log.enable = true
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - snapshot.fetch.size = 2000
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - errors.retry.timeout = 300000
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - value.converter.debezium.data.datetime.handling.mode = string
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - snapshot.select.statement.overrides =
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - logging.level.io.debezium.relational.history = DEBUG
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - heartbeat.interval.ms = 10000
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - socket.timeout.ms = 60000
20:21:12.002 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.file.auto.create = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - value.converter.schemas.enable = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - gtid.source.filter.dml.events = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - errors.tolerance = all
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - buffer.memory.bytes = 67108864
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - max.batch.size = 1024
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.keep.alive = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history = io.debezium.relational.history.FileDatabaseHistory
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - snapshot.mode = initial
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.timeout.ms = 30000
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.sslMode = disabled
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - value.converter.debezium.data.timestamp.handling.mode = string
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - value.converter.debezium.data.date.handling.mode = string
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - tcpKeepAlive = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - task.shutdown.graceful.timeout.ms = 30000
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - metrics.enabled = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - decimal.handling.mode = string
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.skip.unparseable.ddl = false
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - value.converter = org.apache.kafka.connect.json.JsonConverter
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.file.buffer.size = 1
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.backoff.initial.delay.ms = 1000
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.allowPublicKeyRetrieval = true
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - logging.level.io.debezium.relational.HistorizedRelationalDatabaseSchema = DEBUG
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.history.file.filename = D:\taooxi\debezium\basic\debezium-db-history.dat
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - bigint.unsigned.handling.mode = long
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.max.attempts = 3
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - batch.size = 1024
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - offset.flush.timeout.ms = 10000
20:21:12.004 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - errors.retry.delay.max.ms = 60000
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - snapshot.max.threads = 1
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.backoff.max.delay.ms = 60000
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - offset.flush.interval.ms = 60000
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - database.connectionTimeZone = Asia/Shanghai
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - connect.keep.alive.interval.ms = 15000
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - logging.level.io.debezium.connector.mysql = DEBUG
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - table.include.list = taooxi.customer
20:21:12.005 [main] INFO com.taooxi.aquila.fullySync.source.CdcSourceBuilder - ==========================================================
20:21:12.007 [main] DEBUG com.taooxi.aquila.fullySync.source.UnifiedCdcSourceBuilder - Checkpoint检查: 目录=D:\taooxi\checkpoint\basic, 有效checkpoint数=0
20:21:12.007 [main] INFO com.taooxi.aquila.fullySync.source.UnifiedCdcSourceBuilder - 🎆 [首次运行] 无Checkpoint,使用全量+增量模式(initial)
20:21:12.042 [main] INFO com.taooxi.aquila.fullySync.source.UnifiedCdcSourceBuilder - 🔧 [SERVER-ID-FIX] 使用Server ID范围: 1000-1001 (解决并行冲突)
20:21:12.161 [main] INFO com.taooxi.aquila.fullySync.process.TableRoutingProcessor - 注册表 customer 的侧输出流标签:customer-side-output
20:21:12.162 [main] INFO com.taooxi.aquila.fullySync.process.TableRoutingProcessor - 表路由处理器初始化完成,共注册 1 个同步表(含分表规则)
开始启动的时候也可以读取到对应的历史文件的存放路径
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。