Flink CDC里pg到pg数据同步全量可以成功,增量不行,还报了这个错误是为啥?2024-04-15 11:17:03
java.lang.IllegalAccessError: tried to access class io.debezium.embedded.EmbeddedEngineChangeEvent from class com.ververica.cdc.debezium.internal.DebeziumChangeConsumer
at com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.commitOffset(DebeziumChangeConsumer.java:80)
at com.ververica.cdc.debezium.DebeziumSourceFunction.notifyCheckpointComplete(DebeziumSourceFunction.java:486)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的错误信息,java.lang.IllegalAccessError: tried to access class io.debezium.embedded.EmbeddedEngineChangeEvent from class com.ververica.cdc.debezium.internal.DebeziumChangeConsumer
,可以判断该问题与Debezium和Flink CDC的版本兼容性有关。具体来说,IllegalAccessError
通常是由于类加载器无法访问某些类或方法导致的,这可能是由于以下原因之一:
Flink CDC依赖于Debezium来捕获数据库变更事件。如果Flink CDC和Debezium的版本不兼容,可能会导致类似的问题。建议您检查以下内容: - 确保使用的Flink CDC版本与Debezium版本兼容。例如,Flink CDC 2.x通常需要Debezium 1.6或更高版本。 - 如果您使用的是自定义构建的JAR包,请确保所有依赖项的版本一致,避免出现冲突。
操作步骤: 1. 检查pom.xml
或build.gradle
文件中的依赖版本。 2. 如果发现版本不匹配,请升级或降级相关依赖。例如:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.9.0.Final</version>
</dependency>
Postgres CDC增量同步需要启用增量快照功能,并且必须满足以下条件: - 开启Checkpoint:增量快照功能依赖于Flink的Checkpoint机制。 - 声明主键:Source表必须声明主键,否则增量同步可能失败。
操作步骤: 1. 在Flink作业中启用Checkpoint:
env.enableCheckpointing(10 * 60 * 1000); // 每10分钟触发一次Checkpoint
ALTER TABLE your_table ADD PRIMARY KEY (id);
Postgres CDC连接器在增量同步阶段会创建Replication Slot。如果Replication Slot未正确管理,可能导致磁盘空间浪费或同步失败。
操作步骤: 1. 登录Postgres数据库,检查当前的Replication Slot:
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('slot_name');
如果全量同步成功但增量同步失败,可能是因为Checkpoint配置不合理。建议调整以下参数以优化增量同步性能: - execution.checkpointing.interval
:设置合理的Checkpoint间隔时间。 - execution.checkpointing.tolerable-failed-checkpoints
:增加容忍的Checkpoint失败次数。
示例配置:
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
如果上述步骤未能解决问题,可能是类加载器隔离导致的。尝试以下方法: - 使用Flink的classloader.resolve-order
参数调整类加载顺序:
classloader.resolve-order: parent-first
lib
目录加载,而不是通过用户代码的JAR包。希望以上解决方案能够帮助您解决问题!您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。