开发者社区> 问答> 正文

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

大家好, flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。

https://github.com/ververica/flink-cdc-connectors/blob/master/README.md

casel.chen <casel_chan@126.com> 于2021年4月20日周二 下午6:18写道:

> 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE binlog_table ( > >                                 id INT, > >                                 name STRING, > >                                 sys_id STRING, > >                                 sequence INT, > >                                 filter STRING, > >                                 tag STRING, > >                                 remark STRING, > >                                 create_date TIMESTAMP, > >                                 update_date TIMESTAMP, > >                                 reserve STRING, > >                                 sys_name STRING, > >                                 metric_seq INT, > >                                 advanced_function STRING, > >                                 value_type STRING, > >                                 value_field STRING, > >                                 status INT, > >                                 syn_date TIMESTAMP, > >                                 confirmer STRING, > >                                 confirm_time TIMESTAMP, > >                                 index_explain STRING, > >                                 field_name STRING, > >                                 tag_values STRING, > >                                 PRIMARY KEY (id) NOT ENFORCED > > ) WITH ( > >       'connector' = 'mysql-cdc', > >       'hostname' = '${mysql.hostname}', > >       'port' = '3306', > >       'username' = '${mysql.username}', > >       'password' = '${mysql.password}', > >       'database-name' = '${mysql.database}', > >       'table-name' = '${mysql.table}' > >       ); > > > > > CREATE TABLE kafka_sink ( > >                               id INT, > >                               name STRING, > >                               sys_id STRING, > >                               sequence INT, > >                               filter STRING, > >                               tag STRING, > >                               remark STRING, > >                               create_date TIMESTAMP, > >                               update_date TIMESTAMP, > >                               reserve STRING, > >                               sys_name STRING, > >                               metric_seq INT, > >                               advanced_function STRING, > >                               value_type STRING, > >                               value_field STRING, > >                               status INT, > >                               syn_date TIMESTAMP, > >                               confirmer STRING, > >                               confirm_time TIMESTAMP, > >                               index_explain STRING, > >                               field_name STRING, > >                               tag_values STRING, > >                               PRIMARY KEY (id) NOT ENFORCED > > ) WITH ( > >       'connector' = 'kafka', > >       'topic' = '${topic}', > >       'properties.bootstrap.servers' = '${bootstrap.servers}', > >       'format' = 'canal-json' > >       ); > > > > > INSERT INTO kafka_sink > >     (SELECT * > >      FROM binlog_table); > > 出来的结果是这样: > > > { > "data": [ > { > "id": 3, > "name": "自动付款接口BuyETC金额", > "sys_id": "0184", > "sequence": 2, > "filter": "(a=1)", > "tag": "MerId(商户号)", > "remark": "O", > "create_date": "2020-11-02 15:01:31", > "update_date": "2021-04-07 09:23:59", > "reserve": "", > "sys_name": "NHL", > "metric_seq": 0, > "advanced_function": "", > "value_type": "sum", > "value_field": "value", > "status": 1, > "syn_date": "2021-01-28 19:31:36", > "confirmer": null, > "confirm_time": null, > "index_explain": "aa", > "field_name": null, > "tag_values": null > } > ], > "type": "INSERT" > } > 并不是标准的canal json格式。改用upsert-kafka connector试了也不行 > > > > CREATE TABLE kafka_sink ( id INT, name STRING, sys_id STRING, > sequence INT, filter STRING, tag STRING, remark STRING, > create_date TIMESTAMP, update_date TIMESTAMP, reserve STRING, > sys_name STRING, metric_seq INT, advanced_function STRING, > value_type STRING, value_field STRING, status INT, syn_date > TIMESTAMP, confirmer STRING, confirm_time TIMESTAMP, index_explain > STRING, field_name STRING, tag_values STRING, PRIMARY KEY (id) NOT > ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}', > 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' = > 'json', > 'value.format' = 'json' ); > > > 出来的数据长这样 > > > > {"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06 > 12:27:30","update_date":"2021-04-06 > 12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07 > 16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null} > > >*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-05 21:59:20 1010 0
1 条回答
写回答
取消 提交回答
  • 不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了*来自志愿者整理的flink

    2021-12-06 11:37:03
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像