开发者社区> 问答> 正文

如何将canal json格式数据按操作类型过滤?

使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。 查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"} CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAP<STRING,INT>METADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAY METADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json'); 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?*来自志愿者整理的flink邮件归档

展开
收起
塔塔塔塔塔塔 2021-12-02 15:17:33 1571 0
1 条回答
写回答
取消 提交回答
  • 先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka

    ,然后你再基于新的 kafka 建一个 canal-json 的表来落地。*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:38:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
HBase2.0重新定义小对象实时存取 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载