Flink CDC里pg到pg数据同步全量可以成功,增量不行,还报了这个错误是为啥?

Flink CDC里pg到pg数据同步全量可以成功,增量不行,还报了这个错误是为啥?2024-04-15 11:17:03
java.lang.IllegalAccessError: tried to access class io.debezium.embedded.EmbeddedEngineChangeEvent from class com.ververica.cdc.debezium.internal.DebeziumChangeConsumer
at com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.commitOffset(DebeziumChangeConsumer.java:80)
at com.ververica.cdc.debezium.DebeziumSourceFunction.notifyCheckpointComplete(DebeziumSourceFunction.java:486)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)

展开
收起
小小鹿鹿鹿 2024-04-22 19:23:53 112 分享 版权
阿里云 AI 助理回答

问题分析

根据您提供的错误信息,java.lang.IllegalAccessError: tried to access class io.debezium.embedded.EmbeddedEngineChangeEvent from class com.ververica.cdc.debezium.internal.DebeziumChangeConsumer,可以判断该问题与Debezium和Flink CDC的版本兼容性有关。具体来说,IllegalAccessError通常是由于类加载器无法访问某些类或方法导致的,这可能是由于以下原因之一:

  1. 依赖冲突:Flink CDC和Debezium之间的依赖版本不匹配。
  2. 类加载问题:某些类在运行时被错误地加载或隔离。
  3. 增量同步配置问题:Postgres CDC增量同步需要特定的配置(如Checkpoint和主键声明),如果未正确配置,可能导致增量同步失败。

解决方案

1. 检查依赖版本兼容性

Flink CDC依赖于Debezium来捕获数据库变更事件。如果Flink CDC和Debezium的版本不兼容,可能会导致类似的问题。建议您检查以下内容: - 确保使用的Flink CDC版本与Debezium版本兼容。例如,Flink CDC 2.x通常需要Debezium 1.6或更高版本。 - 如果您使用的是自定义构建的JAR包,请确保所有依赖项的版本一致,避免出现冲突。

操作步骤: 1. 检查pom.xmlbuild.gradle文件中的依赖版本。 2. 如果发现版本不匹配,请升级或降级相关依赖。例如:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.9.0.Final</version>
</dependency>

2. 配置增量快照功能

Postgres CDC增量同步需要启用增量快照功能,并且必须满足以下条件: - 开启Checkpoint:增量快照功能依赖于Flink的Checkpoint机制。 - 声明主键:Source表必须声明主键,否则增量同步可能失败。

操作步骤: 1. 在Flink作业中启用Checkpoint:

env.enableCheckpointing(10 * 60 * 1000); // 每10分钟触发一次Checkpoint
  1. 确保Postgres源表已声明主键。例如:
    ALTER TABLE your_table ADD PRIMARY KEY (id);
    

3. 检查Replication Slot管理

Postgres CDC连接器在增量同步阶段会创建Replication Slot。如果Replication Slot未正确管理,可能导致磁盘空间浪费或同步失败。

操作步骤: 1. 登录Postgres数据库,检查当前的Replication Slot:

SELECT * FROM pg_replication_slots;
  1. 删除不再使用的Replication Slot:
    SELECT pg_drop_replication_slot('slot_name');
    

4. 调整Checkpoint相关参数

如果全量同步成功但增量同步失败,可能是因为Checkpoint配置不合理。建议调整以下参数以优化增量同步性能: - execution.checkpointing.interval:设置合理的Checkpoint间隔时间。 - execution.checkpointing.tolerable-failed-checkpoints:增加容忍的Checkpoint失败次数。

示例配置

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

5. 检查类加载问题

如果上述步骤未能解决问题,可能是类加载器隔离导致的。尝试以下方法: - 使用Flink的classloader.resolve-order参数调整类加载顺序:

classloader.resolve-order: parent-first
  • 确保所有依赖项都通过Flink的lib目录加载,而不是通过用户代码的JAR包。

总结与重要提醒

  • 关键点:此问题的根本原因可能是依赖版本不兼容或增量同步配置不当。请优先检查Flink CDC和Debezium的版本是否匹配,并确保启用了Checkpoint和主键声明。
  • 重要提醒:Postgres CDC增量同步对环境配置要求较高,建议严格按照官方文档进行配置,避免因参数设置不当导致同步失败。
  • 后续操作:如果问题仍未解决,请提供完整的Flink作业日志和配置文件,以便进一步排查。

希望以上解决方案能够帮助您解决问题!您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理