我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = 'test0717'," + " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', " + "'scan.startup.mode' = 'group-offsets','properties.group.id'='test'," + "'format' = 'debezium-json'," + "'debezium-json.schema-include'='false'," + "'debezium-json.ignore-parse-errors'='true')"; tEnv.executeSql(sourceDdl); System.out.println("init source ddl successful ==>" + sourceDdl); String sinkDdl = " CREATE TABLE sink " + "( " + "id STRING NOT NULL," + " name STRING, " + "description STRING," + " weight Double," + " PRIMARY KEY (id) NOT ENFORCED " + ")" + " WITH " + "( " + "'connector' = 'jdbc', " + "'url' = 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + "'table-name' = 'table-out', " + "'driver'= 'com.mysql.cj.jdbc.Driver'," + "'sink.buffer-flush.interval'='1s'," + "'sink.buffer-flush.max-rows'='1000'," + "'username'='DataPip', " + "'password'='DataPip')"; tEnv.executeSql(sinkDdl); System.out.println("init sink ddl successful ==>" + sinkDdl);
String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source"; System.out.println("execute dml ==>" + dml); tEnv.executeSql(dml); tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE debezium_source (EXCLUDING ALL)"); tEnv.executeSql("INSERT INTO print_table SELECT id,name ,description, weight FROM debezium_source");
*来自志愿者整理的flink邮件归档
你观察到有sink写不过来导致反压吗? 或者你调大flush interval试试,让每个buffer攒更多的数据*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。