Flink CDC这个问题咋接近,debezium在读取mysql值转换的时候,datetime类型值是0000-00-00 00:00:00直接转为null了?Invalid value '0000-00-00 00:00:00' stored in column 'created_at' of table 'sink_algorithm.act_price_predict' converted to empty value
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用 Flink CDC 和 Debezium 读取 MySQL 数据时,datetime
类型的值为 0000-00-00 00:00:00
时被转换为 null
,并抛出类似以下错误:
Invalid value '0000-00-00 00:00:00' stored in column 'created_at' of table 'sink_algorithm.act_price_predict' converted to empty value
该问题的根本原因在于 MySQL 中允许存储非法的 datetime
值(如 0000-00-00 00:00:00
),而 Debezium 在解析这些值时无法正确处理,默认将其转换为 null
或空值。
MySQL 的 sql_mode
参数可以控制是否允许存储非法的日期值。如果希望禁止存储 0000-00-00 00:00:00
这样的非法值,可以通过修改 sql_mode
来实现。
操作步骤: - 检查当前 MySQL 的 sql_mode
:
SHOW VARIABLES LIKE 'sql_mode';
NO_ZERO_DATE
和 NO_ZERO_IN_DATE
,则需要添加这两个模式:
SET GLOBAL sql_mode = 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE';
注意: 修改 sql_mode
可能会影响现有数据的插入和更新操作,请确保在测试环境中验证后再应用到生产环境。
如果无法修改 MySQL 配置,或者需要兼容现有的非法值,可以通过自定义 Debezium 的序列化器来处理 0000-00-00 00:00:00
。
操作步骤: - 在 DataStream API 中实现自定义的 DebeziumDeserializationSchema
,并在解析 datetime
类型时对非法值进行特殊处理。例如:
public class CustomDeserializer implements DebeziumDeserializationSchema<RowData> {
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Struct value = (Struct) record.value();
String createdAt = value.getString("created_at");
// 处理非法值
if ("0000-00-00 00:00:00".equals(createdAt)) {
createdAt = null; // 或者设置为默认值,如 "1970-01-01 00:00:00"
}
// 继续处理其他字段...
RowData rowData = convertToRowData(value);
out.collect(rowData);
}
private RowData convertToRowData(Struct value) {
// 转换逻辑...
return rowData;
}
}
MySqlSource<RowData> mySqlSource = MySqlSource.<RowData>builder()
.deserializer(new CustomDeserializer())
.build();
注意: 自定义序列化器需要根据实际业务需求调整非法值的处理逻辑。
Debezium 提供了配置选项,可以在读取数据时忽略或替换非法值。
操作步骤: - 在 Debezium 的配置文件中添加以下参数:
time.precision.mode=adaptive_time_microseconds
date.format=yyyy-MM-dd
datetime.format=yyyy-MM-dd HH:mm:ss
transforms
插件。例如:
transforms=ReplaceZeroDate
transforms.ReplaceZeroDate.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceZeroDate.replacements=created_at:1970-01-01 00:00:00
注意: 使用 transforms
插件需要确保 Kafka Connect 环境支持相关功能。
某些版本的 Debezium 对非法日期值的处理存在已知问题。建议升级到最新稳定版本,以获得更好的兼容性和修复。
操作步骤: - 检查当前使用的 Debezium 版本。 - 升级到最新版本(例如 1.9.x
或更高版本)。 - 在升级后重新测试作业,确认问题是否解决。
注意: 升级 Debezium 版本可能需要同步升级 Flink CDC 连接器版本,请确保版本兼容性。
sql_mode
参数,禁止存储非法日期值,从源头解决问题。transforms
插件,灵活处理非法值。重要提醒: 在生产环境中修改配置或代码前,请务必在测试环境中充分验证,避免对现有业务造成影响。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。