开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

使用FLink CDC 3.0 ,将MySQL数据同步到SR遇到一些问题

运行环境
MySQL:5.7x
StarRocks:2.5
FLink:1.18.0
FLink-CDC:3.0

配置文件如下

source:
  type: mysql
  hostname: xxx
  port: 3306
  username: xxx
  password: xxx
  tables: xxx_db.\.*
  server-id: 1
  server-time-zone: Asia/Shanghai

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://xxx:9030
  load-url: xxx:8030
  username: root
  password: ""
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to StarRocks
  parallelism: 1

DDL问题:

  1. 添加字段,可以同步成功,但任务会报错,必须取消任务重启后才能恢复正常
  2. 删除字段,可以同步成功,问题同上
  3. 修改字段类型,不会同步
    问题1的错误日志片段如下:
    Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed
     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
     at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
     at com.ververica.cdc.runtime.operators.schema.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:123)
     ... 30 more
    Caused by: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed
     at com.ververica.cdc.common.utils.SchemaUtils.applyAddColumnEvent(SchemaUtils.java:73)
     at com.ververica.cdc.common.utils.SchemaUtils.applySchemaChangeEvent(SchemaUtils.java:53)
     at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager.applySchemaChange(SchemaManager.java:113)
     at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:102)
     at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleCoordinationRequest(SchemaRegistry.java:157)
     at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:143)
     at org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1070)
     at org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:616)
     at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
     at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
     at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
     at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
     at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
     at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    

DML问题:

  1. 取消同步任务,在MySQL上删除一条数据,然后重启同步任务,删除的数据无法同步在SR上删除;但同样的操作,添加数据可以同步

展开
收起
哈酒 2024-01-23 17:04:35 299 0
3 条回答
写回答
取消 提交回答
  • 由于您没有提供具体的问题描述,我无法为您提供针对性的解决方案。但是,我可以给您一些建议来解决FLink CDC 3.0同步MySQL数据到StarRocks时可能遇到的问题:

    1. 确保您的MySQL和StarRocks版本与FLink CDC 3.0兼容。根据官方文档,FLink CDC 3.0支持的MySQL版本为5.7.x,而StarRocks 2.5应该与FLink CDC 3.0兼容。

    2. 检查您的Flink配置文件(flink-conf.yaml)中是否包含正确的连接器配置。例如,确保您已经添加了以下配置:

    table.exec.source.cdc-events-duplicate: true
    
    1. 在Flink SQL中创建源表和目标表时,确保使用正确的数据类型和字段名称。例如:
    CREATE TABLE mysql_source (
        id INT,
        name STRING,
        age INT
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test',
        'table-name' = 'user'
    );
    
    CREATE TABLE starrocks_sink (
        id INT,
        name STRING,
        age INT
    ) WITH (
        'connector' = 'starrocks',
        'fenodes' = 'localhost:8030',
        'table.identifier' = 'test_db.user',
        'username' = 'root',
        'password' = '123456'
    );
    
    1. 如果遇到问题,请查看Flink的日志文件以获取更多详细信息。这可以帮助您诊断问题并找到解决方案。

    如果您能提供更多关于问题的详细信息,我将更好地帮助您解决问题。

    2024-02-29 21:59:51
    赞同 展开评论 打赏
  • 这个问题是因为任务没有使用 savepoint 重启。因此重启后会再次进行全量同步,此时insert、update 的操作数据都能够读取到最新的结果,而 delete 数据则因源表不再包含这条数据,下游不会做对应处理。数据仍然保存。
    解决办法是通过 flink savepoint 停止原来的任务,如执行下面命令:

    flink stop [--savepointPath <pathToSavepoint>] <jobID>
    

    并且在 flink-conf.yaml 里设置 execution.savepoint.path 指定 savepoint 路径,重新启动 cdc 任务。这样 cdc 任务会从重启前的位点,逐条处理binlog,正确处理 delete 事件。
    Flink 3.0 还不支持通过 -s/--savepoint 参数指定savepoint 路径。

    2024-01-26 13:12:11
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    这个问题可能是由于Flink CDC在处理删除字段时出现了问题。你可以尝试以下方法解决这个问题:

    1. 检查FLink CDC的版本,确保它是最新的。如果不是,请升级到最新版本,看看问题是否得到解决。

    2. 如果问题仍然存在,你可以尝试在FLink CDC的GitHub仓库中提交一个issue,详细描述你遇到的问题。这样,FLink团队可能会关注这个问题,并在后续版本中修复它。

    3. 作为临时解决方案,你可以尝试在同步任务中使用DELETE操作符,而不是直接删除数据。这样,FLink CDC应该能够正确处理删除操作。例如:

    DELETE FROM your_table WHERE some_condition;
    
    2024-01-24 14:18:00
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像