用flinkcdc读取mysql的binlog,mysql中原先字段类型为decimal的,输出之后全变成了字母乱码?代码中只是进行了数据的读取,然后输出到了控制台和本地文件。就是这种原先是金额的字段,他们的值都变了,
"points":"AA==","balance":"DZfS",这个是代码,中间什么操作都没做,就是读取数据源,然后打印输出
properties.setProperty("decimal.handling.mode","double");,
,此回答整理自钉群“Flink CDC 社区”
这个问题可能是由于 Flink CDC 在读取 MySQL 的 binlog 时,对于 decimal 类型的字段解析不正确导致的。你可以尝试在 Flink CDC 的配置中添加 fieldDelimiter 属性,指定字段分隔符,以便正确解析数据。
以下是一个示例配置:
FlinkCDC mysqlSrc = FlinkCDC.forMySql()
.hostname("localhost")
.port(3306)
.username("root")
.password("your-password")
.database("your-database")
.table("your-table")
.fieldDelimiter(",,“) // 添加字段分隔符
.startFromLatest()
.build();
然后,你可以在 Flink 的 DataStream 阶段使用 JsonDebeziumDeserializationSchema 对数据进行反序列化,代码如下:
DataStreamSource ds = env.fromSource(
mysqlSrc, WatermarkStrategy.noWatermarks(), sourceName: "mysqlSrc");
ds.print();
env.execute("Flink CDC Example");
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。