mysql表: CREATE TABLE test
( id
int(11) NOT NULL, name
varchar(255) NOT NULL, time
datetime NOT NULL, status
int(11) NOT NULL, PRIMARY KEY (id
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE status
( id
int(11) NOT NULL, name
varchar(255) NOT NULL, PRIMARY KEY (id
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
kafka中数据: // 表test 中insert事件 {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
//表status 中的事件 {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData, 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
*来自志愿者整理的flink邮件归档
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
这个邮件里提到了类似的问题。
https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。