flink-CDC-3.0 mysql to doris 数据同步任务 经常报错 stream load error: [LABEL_ALREADY_EXISTS] ,达到重启最大次数后退出
具体版本信息
flink1.18.0
Doris 2.0.2
flink-cdc-3.0.0
flink-cdc-pipeline-connector-mysql-3.0.0.jar
flink-cdc-pipeline-connector-doris-3.0.0.jar
org.apache.doris.flink.exception.DorisBatchLoadException: stream load error: [LABEL_ALREADY_EXISTS]
异常堆栈
2024-04-08 08:18:55
org.apache.doris.flink.exception.DorisBatchLoadException: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error:
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.checkFlushException(DorisBatchStreamLoad.java:172)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.writeRecord(DorisBatchStreamLoad.java:125)
at org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:106)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
at com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:154)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error:
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.load(DorisBatchStreamLoad.java:257)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.run(DorisBatchStreamLoad.java:208)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error: [LABEL_ALREADY_EXISTS]TStatus: errCode = 2, detailMessage = Label [_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af] has already been used, relate to txn [272201], see more in null
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.load(DorisBatchStreamLoad.java:249)
... 4 more
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的错误信息,stream load error: [LABEL_ALREADY_EXISTS]
是 Doris 数据库在执行 Stream Load 操作时抛出的异常。该错误表明当前任务尝试使用的 Label 已经被之前的任务使用过,而 Doris 的 Stream Load 要求每个 Label 必须是唯一的。
此问题通常发生在以下场景: 1. 任务重启:Flink CDC 任务在失败后重启,但未正确处理已提交的 Label。 2. Checkpoint 或 Savepoint 恢复:任务从 Checkpoint 或 Savepoint 恢复时,重复提交了相同的 Label。 3. 并发写入冲突:多个任务或线程同时向 Doris 写入数据,导致 Label 冲突。
结合您的版本信息(Flink 1.18.0、Doris 2.0.2、Flink CDC 3.0.0),以下是详细的解决方案。
Doris Sink 提供了 sink.enable-2pc
和 sink.label-prefix
参数,用于解决 Label 冲突问题。建议调整以下参数:
sink.enable-2pc=true
sink.label-prefix=unique_prefix_
sink.enable-2pc
:启用两阶段提交(2PC),确保 Flink 与 Doris 的事务一致性。sink.label-prefix
:为每个任务生成唯一的 Label 前缀,避免不同任务之间的 Label 冲突。重要:sink.label-prefix
应设置为全局唯一的值,例如任务名称或时间戳。
Flink CDC 依赖 Checkpoint 机制来保证 Exactly-Once 语义。如果 Checkpoint 配置不当,可能导致任务重启时重复提交数据。建议优化以下参数:
execution.checkpointing.interval=10s
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.timeout=5min
execution.checkpointing.min-pause=5s
execution.checkpointing.interval
:设置合理的 Checkpoint 间隔,避免频繁触发。execution.checkpointing.mode
:确保使用 EXACTLY_ONCE 模式。execution.checkpointing.timeout
:增加超时时间,避免因网络延迟导致 Checkpoint 失败。execution.checkpointing.min-pause
:设置最小间隔,避免连续触发 Checkpoint。如果任务已经运行了一段时间,Doris 中可能积累了大量历史 Label。可以通过以下步骤清理:
登录 Doris FE 节点,执行以下 SQL 查询历史 Label:
SHOW LOAD WHERE LABEL = '_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af';
确认无用的 Label 后,手动删除相关事务:
CANCEL LOAD FROM database_name WHERE LABEL = '_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af';
注意:清理 Label 时需谨慎操作,确保不会影响正在运行的任务。
您当前使用的版本为 Flink CDC 3.0.0 和 Doris 2.0.2。建议升级到最新稳定版本,以修复已知问题并提升兼容性:
flink-doris-connector-1.16-1.5.2.jar
或更高版本。升级后,请重新测试任务,确保问题已解决。
高并发场景下,多个任务可能同时向 Doris 写入数据,导致 Label 冲突。建议:
parallelism.default
设置为较低值(如 1)。为了进一步排查问题,建议启用详细日志并监控任务运行状态:
启用 DEBUG 日志: 在 log4j.properties
文件中添加以下配置:
logger.doris.name = org.apache.doris.flink
logger.doris.level = DEBUG
监控任务状态: 使用 Flink Web UI 和 Doris FE 的监控页面,观察任务的 Checkpoint 和 Stream Load 状态。
通过以上步骤,您可以有效解决 LABEL_ALREADY_EXISTS
错误。核心措施包括启用 Doris Sink 的幂等性配置、优化 Flink CDC 的 Checkpoint 参数、清理历史 Label 以及升级相关组件版本。如果问题仍未解决,请提供更详细的日志信息以便进一步分析。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。