hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态
idea中运行信息如下: SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class is
com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. 十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect 信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11)
mysql相关配置: binlog_format ROW log_bin ON binlog_row_image FULL
java主要代码如下
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class mysqlSourceAndSink { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);
String cdc_user_id = "create table cdc_user_id(\n" + "id INT \n" + ",pid INT \n" + ",PRIMARY KEY (id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3307',\n" + " 'username' = 'flink',\n" + " 'password' = '123456',\n" + " 'server-id' = '6668',\n" + // " 'server-time-zone' = 'UTC',\n" + " 'server-time-zone' = 'Asia/Shanghai',\n" + " 'database-name' = 'flinktest',\n" + " 'table-name' = 'flinktest.user_id'\n" + ")";
String table_print = "create table table_print( \n" + "id bigint,\n" + "pid bigint\n" + ") WITH(\n" + "'connector' = 'print',\n" + "'print-identifier' = 'pppp',\n" + "'standard-error' = 'true'\n" + ")";
tableEnv.executeSql(table_print); // tableEnv.executeSql(jdbc_user_id); tableEnv.executeSql(cdc_user_id);
String cdcUserPid2Print = "insert into table_print select id, pid from cdc_user_id"; tableEnv.executeSql(cdcUserPid2Print);
* mysql的err日志中有如下打印*
2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273) 2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db: 'unconnected' user: 'flink' host: 'localhost' (Got an error reading communication packets) 2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db: 'unconnected' user: 'flink' host: 'localhost' (failed on flush_net()) 2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556)
* 账号有相关授权* create user 'flink'@'%' identified by '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'flink' IDENTIFIED BY '123456'; flush privileges;
另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来
烦请大佬帮忙看看我这是什么原因,谢谢*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。