FlinkCDC这里为啥打印不到数据?而通过普通的source sink 是可以读到数据的 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.使用FLINKSQL DDL模式构建CDC 表
tableEnv.executeSql("CREATE TABLE ad_schedule_test (\n" +
" schedule_id Int,\n" +
"schedule_no String,\n" +
"schedule_name String,\n" +
"schedule_status Int,\n" +
"create_time TIMESTAMP\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc', -- 必须为 'mysql-cdc'\n" +
" 'hostname' = '172.20.150.109', -- 数据库的 IP\n" +
" 'port' = '3406', -- 数据库的访问端口\n" +
" 'username' = 'test', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)\n" +
" 'password' = 'test', -- 数据库访问的密码\n" +
" 'database-name' = 'bigdata', -- 需要同步的数据库\n" +
" 'scan.incremental.snapshot.enabled' = 'false',\n" +
" 'scan.startup.mode' = 'initial', \n"+
" 'table-name' = 'bigdata.ad_schedule_test' -- 需要同步的数据表名\n" +
" )");
//3.查询数据并转换为流输出
Table table = tableEnv.sqlQuery("select * from coocaa_bigdata_ad_schedule_test");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
//4.启动
env.execute();
这段代码中,FlinkCDC无法打印数据的原因可能是以下几点:
FlinkCDC的日志级别设置过高,导致输出的信息被过滤掉。可以尝试将日志级别设置为DEBUG或INFO,以便查看详细的日志信息。
FlinkCDC的配置可能不正确。请检查配置文件中的参数是否正确,例如数据库连接信息、表名等。
FlinkCDC可能没有正确连接到MySQL数据库。请确保数据库服务器正在运行,并且防火墙允许Flink访问数据库。
数据库中的数据可能没有被正确同步到FlinkCDC。请检查数据库中的数据是否已经被正确插入或更新。
FlinkCDC可能没有正确处理数据。请检查FlinkCDC的实现逻辑,确保它可以正确处理各种类型的数据。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。