Flink CDC这个读和写放在一个任务,怎么都没作用了啊?

问题1:Flink CDC这个读和写放在一个任务,怎么都没作用了啊?SourceFunction sourceFunction = OracleSource.builder() .hostname("localhost") .port(1521) .database("ORCL") // monitor XE database .schemaList("scott") // monitor inventory schema .tableList("scott.t_user") // monitor products table .username("scott") .password("123456") //从最新位置读取,可自行修改initial()、latest() //.startupOptions(StartupOptions.initial()) .deserializer(new UserSchema()) // converts SourceRecord to JSON String .build();

    // 启动一个webUI
    Configuration configuration = new Configuration();
    configuration.setInteger(RestOptions.PORT, 8081);
    configuration.setString(RestOptions.ADDRESS, "127.0.0.1");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

    // 将数据写入 Oracle
    env.addSource(sourceFunction).addSink(JdbcSink.sink(
            // 设置插入语句
            "insert INTO \"SCOTT\".\"t_user_copy\"  (\"id\", \"name\", \"age\")  VALUES (?, ?, ?) ",
            // 设置 JdbcStatementBuilder
            (JdbcStatementBuilder<User>) (preparedStatement, value) -> {
                System.out.println("asafag" + value.getName());
                preparedStatement.setString(1, value.getId());
                preparedStatement.setString(2, value.getName());
                preparedStatement.setInt(3, value.getAge());
            },
            // 设置连接器
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("oracle.jdbc.driver.OracleDriver")
                    .withUrl("jdbc:oracle:thin:@127.0.0.1:1521/orcl")
                    .withUsername("scott")
                    .withPassword("123456")
                    .build()
    )).setParallelism(1);

    env.execute("FlinkCDCOracle");

问题2:checkpoint没配置么,我刚接触flink,还不太懂

展开
收起
真的很搞笑 2023-06-29 08:16:45 175 分享 版权
1 条回答
写回答
取消 提交回答
  • 回答1:开启checkpoint后,重启的时候能不能从 上次停止的位置恢复?,此回答整理自钉群“Flink CDC 社区”

    2023-06-29 09:00:29
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理