使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。 查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"} CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json'); 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?*来自志愿者整理的flink邮件归档
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka
,然后你再基于新的 kafka 建一个 canal-json 的表来落地。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。