开发者社区> 问答> 正文

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张

场景: canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序; 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢? 例如mysql实例db1中有表test, statusCREATE TABLE test ( id int(11) NOT NULL, name varchar(255) NOT NULL, time datetime NOT NULL, status int(11) NOT NULL, PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE TABLE status ( status int(11) NOT NULL, name varchar(255) NOT NULL, PRIMARY KEY (status)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE TABLE test (id INT,name VARCHAR(255),time TIMESTAMP(3),status INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (status INT,name VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true');

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-07 15:24:42 2160 0
1 条回答
写回答
取消 提交回答
  • 另外,我理解下你的需求是 db1.test 同步到 db2.test, db1.status 同步到 db2.status?

    多表的有序同步是指?

    我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert

    into 到 db2.status mysql table就行了。*来自志愿者整理的flink邮件归档

    2021-12-07 16:34:34
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载