开发者社区> 问答> 正文

flink-cdc 无法读出binlog,程序也不报错

hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态 

idea中运行信息如下:  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  SLF4J: Defaulting to no-operation (NOP) logger implementation  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further  details.  SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".  SLF4J: Defaulting to no-operation MDCAdapter implementation.  SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further  details.  Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver  class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered  via the SPI and manual loading of the driver class is generally unnecessary.  十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient  connect  信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11) 

mysql相关配置:  binlog_format ROW  log_bin ON  binlog_row_image FULL 

java主要代码如下 

import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import org.apache.flink.table.api.EnvironmentSettings;  import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 

public class mysqlSourceAndSink {  public static void main(String[] args) {  StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();  EnvironmentSettings envSettings =  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,  envSettings); 

String cdc_user_id = "create table cdc_user_id(\n" +  "id INT \n" +  ",pid INT \n" +  ",PRIMARY KEY (id) NOT ENFORCED \n" +  ") WITH (\n" +  " 'connector' = 'mysql-cdc',\n" +  " 'hostname' = 'localhost',\n" +  " 'port' = '3307',\n" +  " 'username' = 'flink',\n" +  " 'password' = '123456',\n" +  " 'server-id' = '6668',\n" +  // " 'server-time-zone' = 'UTC',\n" +  " 'server-time-zone' = 'Asia/Shanghai',\n" +  " 'database-name' = 'flinktest',\n" +  " 'table-name' = 'flinktest.user_id'\n" +  ")"; 

String table_print = "create table table_print( \n" +  "id bigint,\n" +  "pid bigint\n" +  ") WITH(\n" +  "'connector' = 'print',\n" +  "'print-identifier' = 'pppp',\n" +  "'standard-error' = 'true'\n" +  ")"; 

tableEnv.executeSql(table_print);  // tableEnv.executeSql(jdbc_user_id);  tableEnv.executeSql(cdc_user_id); 

String cdcUserPid2Print = "insert into table_print select id, pid  from cdc_user_id";  tableEnv.executeSql(cdcUserPid2Print); 

*  mysql的err日志中有如下打印* 

2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to  master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273)  2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db:  'unconnected' user: 'flink' host: 'localhost' (Got an error reading  communication packets)  2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db:  'unconnected' user: 'flink' host: 'localhost' (failed on flush_net())  2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to  master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556) 

*  账号有相关授权*  create user 'flink'@'%' identified by '123456';  GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON . TO 'flink' IDENTIFIED BY '123456';  flush privileges; 

另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来 

烦请大佬帮忙看看我这是什么原因,谢谢*来自志愿者整理的flink邮件归档

展开
收起
CCCC 2021-12-02 15:10:40 1283 0
1 条回答
写回答
取消 提交回答
  • 字段类型没按官网的要求对应起来,对应起来后正常了*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:52:25
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
PolarDB-X 2.0 全局 Binlog 与备份恢复能 立即下载