flink-CDC-3.0 mysql to doris 数据同步任务 经常报错

flink-CDC-3.0 mysql to doris 数据同步任务 经常报错 stream load error: [LABEL_ALREADY_EXISTS] ,达到重启最大次数后退出

具体版本信息
flink1.18.0
Doris 2.0.2
flink-cdc-3.0.0
flink-cdc-pipeline-connector-mysql-3.0.0.jar
flink-cdc-pipeline-connector-doris-3.0.0.jar

org.apache.doris.flink.exception.DorisBatchLoadException: stream load error: [LABEL_ALREADY_EXISTS]

异常堆栈

2024-04-08 08:18:55
org.apache.doris.flink.exception.DorisBatchLoadException: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error:
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.checkFlushException(DorisBatchStreamLoad.java:172)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.writeRecord(DorisBatchStreamLoad.java:125)
at org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:106)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
at com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:154)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error:
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.load(DorisBatchStreamLoad.java:257)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.run(DorisBatchStreamLoad.java:208)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.doris.flink.exception.DorisBatchLoadException: stream load error: [LABEL_ALREADY_EXISTS]TStatus: errCode = 2, detailMessage = Label [_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af] has already been used, relate to txn [272201], see more in null
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.load(DorisBatchStreamLoad.java:249)
... 4 more

error001.png

展开
收起
dyyx007 2024-04-08 09:52:00 475 分享 版权
阿里云 AI 助理回答

问题分析

根据您提供的错误信息,stream load error: [LABEL_ALREADY_EXISTS] 是 Doris 数据库在执行 Stream Load 操作时抛出的异常。该错误表明当前任务尝试使用的 Label 已经被之前的任务使用过,而 Doris 的 Stream Load 要求每个 Label 必须是唯一的。

此问题通常发生在以下场景: 1. 任务重启:Flink CDC 任务在失败后重启,但未正确处理已提交的 Label。 2. Checkpoint 或 Savepoint 恢复:任务从 Checkpoint 或 Savepoint 恢复时,重复提交了相同的 Label。 3. 并发写入冲突:多个任务或线程同时向 Doris 写入数据,导致 Label 冲突。

结合您的版本信息(Flink 1.18.0、Doris 2.0.2、Flink CDC 3.0.0),以下是详细的解决方案。


解决方案

1. 启用 Doris Sink 的幂等性配置

Doris Sink 提供了 sink.enable-2pcsink.label-prefix 参数,用于解决 Label 冲突问题。建议调整以下参数:

sink.enable-2pc=true
sink.label-prefix=unique_prefix_
  • sink.enable-2pc:启用两阶段提交(2PC),确保 Flink 与 Doris 的事务一致性。
  • sink.label-prefix:为每个任务生成唯一的 Label 前缀,避免不同任务之间的 Label 冲突。

重要sink.label-prefix 应设置为全局唯一的值,例如任务名称或时间戳。


2. 调整 Flink CDC 的 Checkpoint 配置

Flink CDC 依赖 Checkpoint 机制来保证 Exactly-Once 语义。如果 Checkpoint 配置不当,可能导致任务重启时重复提交数据。建议优化以下参数:

execution.checkpointing.interval=10s
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.timeout=5min
execution.checkpointing.min-pause=5s
  • execution.checkpointing.interval:设置合理的 Checkpoint 间隔,避免频繁触发。
  • execution.checkpointing.mode:确保使用 EXACTLY_ONCE 模式。
  • execution.checkpointing.timeout:增加超时时间,避免因网络延迟导致 Checkpoint 失败。
  • execution.checkpointing.min-pause:设置最小间隔,避免连续触发 Checkpoint。

3. 清理 Doris 中的历史 Label

如果任务已经运行了一段时间,Doris 中可能积累了大量历史 Label。可以通过以下步骤清理:

  1. 登录 Doris FE 节点,执行以下 SQL 查询历史 Label:

    SHOW LOAD WHERE LABEL = '_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af';
    
  2. 确认无用的 Label 后,手动删除相关事务:

    CANCEL LOAD FROM database_name WHERE LABEL = '_0_xxxxxxxx_064ad128-ed6c-44b8-99b4-6e715fb637af';
    

注意:清理 Label 时需谨慎操作,确保不会影响正在运行的任务。


4. 升级 Flink CDC 和 Doris Connector

您当前使用的版本为 Flink CDC 3.0.0 和 Doris 2.0.2。建议升级到最新稳定版本,以修复已知问题并提升兼容性:

  • Flink CDC:升级至 3.0.x 最新版本。
  • Doris Connector:升级至 flink-doris-connector-1.16-1.5.2.jar 或更高版本。

升级后,请重新测试任务,确保问题已解决。


5. 检查任务并发和资源分配

高并发场景下,多个任务可能同时向 Doris 写入数据,导致 Label 冲突。建议:

  • 降低并发度:将 parallelism.default 设置为较低值(如 1)。
  • 增加资源:确保 Flink TaskManager 和 Doris BE 节点有足够的内存和 CPU 资源。

6. 日志调试与监控

为了进一步排查问题,建议启用详细日志并监控任务运行状态:

  1. 启用 DEBUG 日志: 在 log4j.properties 文件中添加以下配置:

    logger.doris.name = org.apache.doris.flink
    logger.doris.level = DEBUG
    
  2. 监控任务状态: 使用 Flink Web UI 和 Doris FE 的监控页面,观察任务的 Checkpoint 和 Stream Load 状态。


总结

通过以上步骤,您可以有效解决 LABEL_ALREADY_EXISTS 错误。核心措施包括启用 Doris Sink 的幂等性配置、优化 Flink CDC 的 Checkpoint 参数、清理历史 Label 以及升级相关组件版本。如果问题仍未解决,请提供更详细的日志信息以便进一步分析。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理