开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

麻烦帮忙看一下 在使用flink sql 读取oracle 全量时 仅能读取最后一条数据 ?

使用参数 'scan.startup.mode' = 'initial' 读取 全量+增量

全部配置如下: 结果只能读最后一条全量 且 无法续读增量

String nocdb ="CREATE TABLE products (\n" +
			"     STU_NO STRING ,\n" +
			"     STU_NAME STRING , \n" +
			"     CARD_TYPE_CODE STRING \n" +
			"     ) WITH (\n" +
			"     'connector' = 'oracle-cdc',\n" +
			"     'hostname' = '172.16.67.88',\n" +
			"     'port' = '1521',\n" +
			"     'url' = 'jdbc:oracle:thin:@//172.16.67.88:1521/LHRSDB',\n" +
			"     'username' = 'flinkuser',\n" +
			"     'password' = 'flinkpw',\n" +
			"     'database-name' = 'LHRSDB',\n" +
			"     'schema-name' = 'FLINKUSER',\n" +
			"     'scan.startup.mode' = 'initial',\n" +
			"     'debezium.log.mining.strategy' = 'online_catalog',\n" +
			"     'debezium.log.mining.continuous.mine' = 'true',\n" +
			"     'debezium.database.tablename.case.insensitive'='false',\n"+
			"     'table-name' = 'NOCDB_STU_INFO')";

tenv.executeSql(pdbsql);

TableResult tableResult = tenv.executeSql("select * from products"); tableResult.print();

image.png

展开
收起
sympathetic 2023-03-17 17:34:12 716 0
4 条回答
写回答
取消 提交回答
  • 我也是碰到这个问题,请问这个问题解决了嘛,求大佬指点,我的是oracle12c,flink1.16,oracle-cdc是3.0

    2024-06-25 18:07:44
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    从您提供的信息来看,可能存在以下几个问题:

    数据库连接问题:您需要确认您的数据库连接是否正常。可以尝试使用命令行或其他方式验证是否能够正常连接到数据库。 SQL查询问题:您需要确认您的SQL查询是否正确。可以尝试在命令行或其他方式中执行相同的SQL查询,看是否能够正确地获取到全部数据。 Flink SQL配置问题:您需要确认您的Flink SQL的配置是否正确。可以检查一下您的'scan.startup.mode'参数是否正确设置。 如果以上问题都没有解决您的问题,您可能需要提供更多的信息,比如您使用的是哪个版本的Flink、Oracle数据库版本、以及完整的代码示例等,以便更好地定位问题所在。

    2023-03-17 20:11:18
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    全量读取+增量读取 Mysql表数据,以changelog-json 格式写入kafka,观察 RowKind 类型及影响的数据条数。

    public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); env.setParallelism(3); // note: 增量同步需要开启CK env.enableCheckpointing(10000); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);

        tableEnvironment.executeSql(" CREATE TABLE demoOrders (n" +
                "         `order_id` INTEGER ,n" +
                "          `order_date` DATE ,n" +
                "          `order_time` TIMESTAMP(3),n" +
                "          `quantity` INT ,n" +
                "          `product_id` INT ,n" +
                "          `purchaser` STRING,n" +
                "           primary key(order_id)  NOT ENFORCED" +
                "         ) WITH (n" +
                "          'connector' = 'mysql-cdc',n" +
                "          'hostname' = 'localhost',n" +
                "          'port' = '3306',n" +
                "          'username' = 'cdc',n" +
                "          'password' = '123456',n" +
                "          'database-name' = 'test',n" +
                "          'table-name' = 'demo_orders'," +
                            //  全量 + 增量同步   
                "          'scan.startup.mode' = 'initial'      " +
                " )");
    
            tableEnvironment.executeSql("CREATE TABLE sink (n" +
                "         `order_id` INTEGER ,n" +
                "          `order_date` DATE ,n" +
                "          `order_time` TIMESTAMP(3),n" +
                "          `quantity` INT ,n" +
                "          `product_id` INT ,n" +
                "          `purchaser` STRING,n" +
                "          primary key (order_id)  NOT ENFORCED " +
                ") WITH (n" +
                "    'connector' = 'kafka',n" +
                "    'properties.bootstrap.servers' = 'localhost:9092',n" +
                "    'topic' = 'mqTest02',n" +
                "    'format' = 'changelog-json' "+
                ")");
    
            tableEnvironment.executeSql("insert into sink select * from demoOrders");}
    

    全量数据输出:

    {"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"} {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"} 修改表数据,增量捕获:

    更新 1005 的值

    {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"} {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}

    2023-03-17 17:51:15
    赞同 展开评论 打赏
  • 月移花影,暗香浮动

    可能是以下原因导致的:

    1. SQL语句写错了,只读取了最后一条数据。需要检查SQL语句,确保正确读取全量数据。

    2. Oracle数据库的版本问题,部分版本的Oracle JDBC驱动在读取全量数据时会存在一些问题。可以尝试更新驱动程序版本。

    3. Flink程序的版本问题,如果使用的Flink版本较旧,可能会存在一些已知问题,需要升级到最新版本。

    4. 数据库游标设置问题,如果Oracle游标在读取数据时出现了问题,可能会导致只读取最后一条数据。可以尝试重新配置游标。

    5. 配置问题,例如未正确配置JDBC连接池、数据源等。可以检查配置文件是否正确,或者重启Flink程序。

    2023-03-17 17:38:21
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server 2017 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载

    相关镜像