开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

为什么Flink CDC中mysql cdc decimal字段类型的值 cdc读取出来 完全不对?

为什么Flink CDC中mysql cdc decimal字段类型的值 cdc读取出来 完全不对啊???image.png image.png image.png

展开
收起
真的很搞笑 2023-07-01 19:48:18 460 0
2 条回答
写回答
取消 提交回答
  • 在 Flink CDC 中,当读取 MySQL 数据库中的 DECIMAL 类型字段时,可能会出现值读取不正确的情况。这是因为 MySQL 中的 DECIMAL 类型是不定精度的数字类型,而 Flink CDC 默认情况下将其读取为字符串类型,导致数据转换错误。

    为了解决这个问题,您可以使用 Flink CDC 提供的 Decimal 类型来正确地读取 MySQL 数据库中的 DECIMAL 类型字段。Decimal 类型是 Flink CDC 特有的数据类型,用于处理不定精度的数字。可以按照以下步骤进行操作:

    1. 在 Flink CDC 的配置文件中,将 DECIMAL 类型字段的数据类型设置为 Decimal。例如,在配置文件中添加 "decimal-handling.mode": "precise",指定精确模式来处理 DECIMAL 类型字段。具体配置如下所示:

    {
      "name": "source",
      "type": "mysql-cdc",
      "config": {
        "hostname": "localhost",
        "port": 3306,
        "username": "root",
        "password": "password",
        "database-name": "test",
        "table-name": "test_table",
        "debezium.snapshot.mode": "initial",
        "debezium.table.types": "table",
        "debezium.transforms": "unwrap",
        "debezium.transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "decimal-handling.mode": "precise"
      }
    }
    

    2. 在 Flink CDC 的代码中,使用 DecimalType 类型来读取 DECIMAL 类型字段的值。在定义表结构时,可以使用 DecimalType.of(precision, scale) 方法来指定 DECIMAL 字段的精度和小数位数。例如:

    import org.apache.flink.table.types.logical.DecimalType;
    
    // ...
    
    tableEnv.connect(source)
      .withFormat(...)
      .withSchema(
        new Schema()
          .field("id", DataTypes.INT())
          .field("name", DataTypes.STRING())
          .field("price", DecimalType.of(10, 2)) // 使用 DecimalType 类型来读取 DECIMAL 类型字段的值
      )
      .createTemporaryTable("source_table");
    

    通过以上步骤,您可以正确地读取 MySQL CDC 中 DECIMAL 类型字段的值。希望这些信息对您有所帮助。如有任何其他问题,请随时提问。

    2023-07-30 13:26:01
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果读取 MySQL 数据库中的 DECIMAL 类型字段,可能会出现值读取不正确的情况。这是因为 MySQL 数据库中的 DECIMAL 类型字段存储的是不定精度的数字,而 Flink CDC 默认情况下会将其读取为字符串类型,导致数据转换错误。
    为了解决这个问题,您可以使用 Flink CDC 提供的 Decimal 类型来读取 MySQL 数据库中的 DECIMAL 类型字段。Decimal 类型是 Flink CDC 特有的数据类型,用于处理不定精度的数字。具体操作方法如下:
    在 Flink CDC 的配置文件中,将 DECIMAL 类型字段的数据类型设置为 Decimal,例如:
    json
    Copy
    {
    "name": "source",
    "type": "mysql-cdc",
    "config": {
    "hostname": "localhost",
    "port": 3306,
    "username": "root",
    "password": "password",
    "database-name": "test",
    "table-name": "test_table",
    "debezium.snapshot.mode": "initial",
    "debezium.table.types": "table",
    "debezium.transforms": "unwrap",
    "debezium.transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "decimal-handling.mode": "precise"
    }
    }
    在 Flink CDC 的代码中,使用 Decimal 类型来读取 DECIMAL 类型字段的值,例如:
    stylus
    Copy
    import org.apache.flink.table.types.logical.DecimalType;

    // ...

    tableEnv.connect(source)
    .withFormat(...)
    .withSchema(
    new Schema()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("price", DecimalType.of(10, 2)) // 使用 Decimal 类型来读取 DECIMAL 类型字段的值
    )
    .createTemporaryTable("source_table");

    2023-07-30 11:02:34
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像