// 创建表,connector使用mysql-cdc tableEnvironment.executeSql("CREATE TABLE Data_Input (ID INT," + " age INT," + " PRIMARY KEY (ID
) NOT ENFORCED " + //mysql表的主键,这个必须设置,否则不能无锁分布式读取和切块 ") " + "WITH ('connector' = 'mysql-cdc', " + "'hostname' = '127.0.0.1', " + "'port' = '3307', " + "'username' = 'root', " + "'password' = '123456', " + "'database-name' = 'flink-cdc', " + " 'scan.incremental.snapshot.enabled' = 'true'," + //增量式快照启动,启用后可以无锁分布式读表,默认启用 "'table-name' = 'user_info'," + " 'server-id' = '123654'" + //server-id,每个程序都得有一个独自的server-id,否则程序会报错,id区间按并行度的数量进行设置 ")");
// 创建下游数据表,这里使用print类型的connector,将数据直接打印出来 tableEnvironment.executeSql("CREATE TABLE Data_Output (" + " ID INT," + " age INT, " + " PRIMARY KEY (ID
) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:postgresql://127.0.0.1:5435/test?currentSchema=mySchema&reWriteBatchedInserts=true'," +
" 'table-name' = 'user_info', " +
" 'username' = 'postgres'," +
" 'password' = 'Guan&*(123'" +
")");
tableEnvironment.executeSql("INSERT INTO Data_Output (SELECT * FROM Data_Input)");
checkpoint配置了吗?没搞看下FAQ,群公告里有FAQ资料(此答案整理自Flink CDC 社区)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。