Flink CDC这个问题咋接近,debezium在读取mysql值转换的时候,什么原因?

Flink CDC这个问题咋接近,debezium在读取mysql值转换的时候,datetime类型值是0000-00-00 00:00:00直接转为null了?Invalid value '0000-00-00 00:00:00' stored in column 'created_at' of table 'sink_algorithm.act_price_predict' converted to empty value

展开
收起
真的很搞笑 2023-12-20 08:25:55 156 分享 版权
阿里云 AI 助理回答

问题分析

在使用 Flink CDC 和 Debezium 读取 MySQL 数据时,datetime 类型的值为 0000-00-00 00:00:00 时被转换为 null,并抛出类似以下错误:

Invalid value '0000-00-00 00:00:00' stored in column 'created_at' of table 'sink_algorithm.act_price_predict' converted to empty value

该问题的根本原因在于 MySQL 中允许存储非法的 datetime 值(如 0000-00-00 00:00:00),而 Debezium 在解析这些值时无法正确处理,默认将其转换为 null 或空值。


解决方案

1. 调整 MySQL 配置以避免非法值

MySQL 的 sql_mode 参数可以控制是否允许存储非法的日期值。如果希望禁止存储 0000-00-00 00:00:00 这样的非法值,可以通过修改 sql_mode 来实现。

操作步骤: - 检查当前 MySQL 的 sql_mode

SHOW VARIABLES LIKE 'sql_mode';
  • 如果结果中不包含 NO_ZERO_DATENO_ZERO_IN_DATE,则需要添加这两个模式:
    SET GLOBAL sql_mode = 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE';
    
  • 修改完成后,重新启动 MySQL 服务以确保配置生效。

注意: 修改 sql_mode 可能会影响现有数据的插入和更新操作,请确保在测试环境中验证后再应用到生产环境。


2. 自定义 Debezium 序列化器处理非法值

如果无法修改 MySQL 配置,或者需要兼容现有的非法值,可以通过自定义 Debezium 的序列化器来处理 0000-00-00 00:00:00

操作步骤: - 在 DataStream API 中实现自定义的 DebeziumDeserializationSchema,并在解析 datetime 类型时对非法值进行特殊处理。例如:

public class CustomDeserializer implements DebeziumDeserializationSchema<RowData> {
    @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Struct value = (Struct) record.value();
        String createdAt = value.getString("created_at");
        
        // 处理非法值
        if ("0000-00-00 00:00:00".equals(createdAt)) {
            createdAt = null; // 或者设置为默认值,如 "1970-01-01 00:00:00"
        }
        
        // 继续处理其他字段...
        RowData rowData = convertToRowData(value);
        out.collect(rowData);
    }

    private RowData convertToRowData(Struct value) {
        // 转换逻辑...
        return rowData;
    }
}
  • 在 Flink 作业中使用自定义的序列化器:
    MySqlSource<RowData> mySqlSource = MySqlSource.<RowData>builder()
      .deserializer(new CustomDeserializer())
      .build();
    

注意: 自定义序列化器需要根据实际业务需求调整非法值的处理逻辑。


3. 通过 Debezium 配置忽略非法值

Debezium 提供了配置选项,可以在读取数据时忽略或替换非法值。

操作步骤: - 在 Debezium 的配置文件中添加以下参数:

time.precision.mode=adaptive_time_microseconds
date.format=yyyy-MM-dd
datetime.format=yyyy-MM-dd HH:mm:ss
  • 如果需要将非法值替换为默认值,可以使用 transforms 插件。例如:
    transforms=ReplaceZeroDate
    transforms.ReplaceZeroDate.type=org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.ReplaceZeroDate.replacements=created_at:1970-01-01 00:00:00
    

注意: 使用 transforms 插件需要确保 Kafka Connect 环境支持相关功能。


4. 升级 Debezium 版本

某些版本的 Debezium 对非法日期值的处理存在已知问题。建议升级到最新稳定版本,以获得更好的兼容性和修复。

操作步骤: - 检查当前使用的 Debezium 版本。 - 升级到最新版本(例如 1.9.x 或更高版本)。 - 在升级后重新测试作业,确认问题是否解决。

注意: 升级 Debezium 版本可能需要同步升级 Flink CDC 连接器版本,请确保版本兼容性。


总结与建议

  • 优先推荐:调整 MySQL 的 sql_mode 参数,禁止存储非法日期值,从源头解决问题。
  • 次优选择:通过自定义 Debezium 序列化器或配置 transforms 插件,灵活处理非法值。
  • 长期维护:定期升级 Debezium 和 Flink CDC 连接器版本,确保使用最新的功能和修复。

重要提醒: 在生产环境中修改配置或代码前,请务必在测试环境中充分验证,避免对现有业务造成影响。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理