flink-cdc 无法读出binlog,程序也不报错


idea中运行信息如下:  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further  details.  SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".  SLF4J: Defaulting to no-operation MDCAdapter implementation.  SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further  details.  Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver  class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered  via the SPI and manual loading of the driver class is generally unnecessary.  十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient  connect  信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11) 

mysql相关配置:  binlog_format ROW  log_bin ON  binlog_row_image FULL 


import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.table.api.EnvironmentSettings;  import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 

public class mysqlSourceAndSink {  public static void main(String[] args) {  StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();  EnvironmentSettings envSettings =  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,  envSettings); 

String cdc_user_id = "create table cdc_user_id(\n" +  "id INT \n" +  ",pid INT \n" +  ",PRIMARY KEY (id) NOT ENFORCED \n" +  ") WITH (\n" +  " 'connector' = 'mysql-cdc',\n" +  " 'hostname' = 'localhost',\n" +  " 'port' = '3307',\n" +  " 'username' = 'flink',\n" +  " 'password' = '123456',\n" +  " 'server-id' = '6668',\n" +  // " 'server-time-zone' = 'UTC',\n" +  " 'server-time-zone' = 'Asia/Shanghai',\n" +  " 'database-name' = 'flinktest',\n" +  " 'table-name' = 'flinktest.user_id'\n" +  ")"; 

String table_print = "create table table_print( \n" +  "id bigint,\n" +  "pid bigint\n" +  ") WITH(\n" +  "'connector' = 'print',\n" +  "'print-identifier' = 'pppp',\n" +  "'standard-error' = 'true'\n" +  ")"; 

tableEnv.executeSql(table_print);  // tableEnv.executeSql(jdbc_user_id);  tableEnv.executeSql(cdc_user_id); 

String cdcUserPid2Print = "insert into table_print select id, pid  from cdc_user_id";  tableEnv.executeSql(cdcUserPid2Print); 

*  mysql的err日志中有如下打印* 

2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to  master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273)  2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db:  'unconnected' user: 'flink' host: 'localhost' (Got an error reading  communication packets)  2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db:  'unconnected' user: 'flink' host: 'localhost' (failed on flush_net())  2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to  master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556) 

*  账号有相关授权*  create user 'flink'@'%' identified by '123456';  GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON . TO 'flink' IDENTIFIED BY '123456';  flush privileges; 



  • 字段类型没按官网的要求对应起来,对应起来后正常了*来自志愿者整理的FLINK邮件归档

