开发者社区> 问答> 正文

不生效 有谁知道么

// 创建表,connector使用mysql-cdc tableEnvironment.executeSql("CREATE TABLE Data_Input (ID INT," + " age INT," + " PRIMARY KEY (ID) NOT ENFORCED " + //mysql表的主键,这个必须设置,否则不能无锁分布式读取和切块 ") " + "WITH ('connector' = 'mysql-cdc', " + "'hostname' = '127.0.0.1', " + "'port' = '3307', " + "'username' = 'root', " + "'password' = '123456', " + "'database-name' = 'flink-cdc', " + " 'scan.incremental.snapshot.enabled' = 'true'," + //增量式快照启动,启用后可以无锁分布式读表,默认启用 "'table-name' = 'user_info'," + " 'server-id' = '123654'" + //server-id,每个程序都得有一个独自的server-id,否则程序会报错,id区间按并行度的数量进行设置 ")");

// 创建下游数据表,这里使用print类型的connector,将数据直接打印出来 tableEnvironment.executeSql("CREATE TABLE Data_Output (" + " ID INT," + " age INT, " + " PRIMARY KEY (ID) NOT ENFORCED " +

    ") WITH (" +
    "    'connector' = 'jdbc', " +
    "    'url' = 'jdbc:postgresql://127.0.0.1:5435/test?currentSchema=mySchema&reWriteBatchedInserts=true'," +
    "    'table-name' = 'user_info', " +
    "    'username' = 'postgres'," +
    "    'password' = 'Guan&*(123'" +
    ")");

tableEnvironment.executeSql("INSERT INTO Data_Output (SELECT * FROM Data_Input)");

展开
收起
游客3oewgrzrf6o5c 2022-06-21 18:15:24 254 0
1 条回答
写回答
取消 提交回答
  • checkpoint配置了吗?没搞看下FAQ,群公告里有FAQ资料(此答案整理自Flink CDC 社区)

    2022-06-21 18:38:21
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载