使用参数 'scan.startup.mode' = 'initial' 读取 全量+增量
全部配置如下: 结果只能读最后一条全量 且 无法续读增量
String nocdb ="CREATE TABLE products (\n" +
" STU_NO STRING ,\n" +
" STU_NAME STRING , \n" +
" CARD_TYPE_CODE STRING \n" +
" ) WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = '172.16.67.88',\n" +
" 'port' = '1521',\n" +
" 'url' = 'jdbc:oracle:thin:@//172.16.67.88:1521/LHRSDB',\n" +
" 'username' = 'flinkuser',\n" +
" 'password' = 'flinkpw',\n" +
" 'database-name' = 'LHRSDB',\n" +
" 'schema-name' = 'FLINKUSER',\n" +
" 'scan.startup.mode' = 'initial',\n" +
" 'debezium.log.mining.strategy' = 'online_catalog',\n" +
" 'debezium.log.mining.continuous.mine' = 'true',\n" +
" 'debezium.database.tablename.case.insensitive'='false',\n"+
" 'table-name' = 'NOCDB_STU_INFO')";
tenv.executeSql(pdbsql);
TableResult tableResult = tenv.executeSql("select * from products"); tableResult.print();
我也是碰到这个问题,请问这个问题解决了嘛,求大佬指点,我的是oracle12c,flink1.16,oracle-cdc是3.0
从您提供的信息来看,可能存在以下几个问题:
数据库连接问题:您需要确认您的数据库连接是否正常。可以尝试使用命令行或其他方式验证是否能够正常连接到数据库。 SQL查询问题:您需要确认您的SQL查询是否正确。可以尝试在命令行或其他方式中执行相同的SQL查询,看是否能够正确地获取到全部数据。 Flink SQL配置问题:您需要确认您的Flink SQL的配置是否正确。可以检查一下您的'scan.startup.mode'参数是否正确设置。 如果以上问题都没有解决您的问题,您可能需要提供更多的信息,比如您使用的是哪个版本的Flink、Oracle数据库版本、以及完整的代码示例等,以便更好地定位问题所在。
全量读取+增量读取 Mysql表数据,以changelog-json 格式写入kafka,观察 RowKind 类型及影响的数据条数。
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); env.setParallelism(3); // note: 增量同步需要开启CK env.enableCheckpointing(10000); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql(" CREATE TABLE demoOrders (n" +
" `order_id` INTEGER ,n" +
" `order_date` DATE ,n" +
" `order_time` TIMESTAMP(3),n" +
" `quantity` INT ,n" +
" `product_id` INT ,n" +
" `purchaser` STRING,n" +
" primary key(order_id) NOT ENFORCED" +
" ) WITH (n" +
" 'connector' = 'mysql-cdc',n" +
" 'hostname' = 'localhost',n" +
" 'port' = '3306',n" +
" 'username' = 'cdc',n" +
" 'password' = '123456',n" +
" 'database-name' = 'test',n" +
" 'table-name' = 'demo_orders'," +
// 全量 + 增量同步
" 'scan.startup.mode' = 'initial' " +
" )");
tableEnvironment.executeSql("CREATE TABLE sink (n" +
" `order_id` INTEGER ,n" +
" `order_date` DATE ,n" +
" `order_time` TIMESTAMP(3),n" +
" `quantity` INT ,n" +
" `product_id` INT ,n" +
" `purchaser` STRING,n" +
" primary key (order_id) NOT ENFORCED " +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'properties.bootstrap.servers' = 'localhost:9092',n" +
" 'topic' = 'mqTest02',n" +
" 'format' = 'changelog-json' "+
")");
tableEnvironment.executeSql("insert into sink select * from demoOrders");}
全量数据输出:
{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"} 修改表数据,增量捕获:
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
可能是以下原因导致的:
SQL语句写错了,只读取了最后一条数据。需要检查SQL语句,确保正确读取全量数据。
Oracle数据库的版本问题,部分版本的Oracle JDBC驱动在读取全量数据时会存在一些问题。可以尝试更新驱动程序版本。
Flink程序的版本问题,如果使用的Flink版本较旧,可能会存在一些已知问题,需要升级到最新版本。
数据库游标设置问题,如果Oracle游标在读取数据时出现了问题,可能会导致只读取最后一条数据。可以尝试重新配置游标。
配置问题,例如未正确配置JDBC连接池、数据源等。可以检查配置文件是否正确,或者重启Flink程序。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。