关于Flink CDC,Debezium 监听orcl 怎么操作?

关于Flink CDC,Debezium 监听orcl 怎么操作?

展开
收起
wenti 2023-03-13 14:17:58 278 分享 版权
1 条回答
写回答
取消 提交回答
  • flink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。

    Properties properties = new Properties();
    //设置数据库连接参数(表名大小写转换)
    properties.setProperty("database.tablename.case.insensitive","false");
    properties.setProperty("database.connection.adapter", "logminer");
    //降低oracle cdc 延迟
    properties.setProperty("log.mining.strategy", "online_catalog");
    properties.setProperty("log.mining.continuous.mine", "true");
    
    //创建Stream环境
    Configuration configuration = new Configuration();
    
    StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    
    //oracle 连接器配置
    SourceFunction<String> build = OracleSource.<String>builder()
                    .hostname("localhost")
                    .port(1521)
                    .database("XE") // monitor XE database
                    .schemaList("test") // monitor test schema
                    .tableList("DBTEST.tbtest1," +
                                "DBTEST.TBTEST2") // monitor tables
                    .username("flinkuser")
                    .password("flinkpw")
                    //从最新位置读取,可自行修改initial()、latest()
                    .startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest())
                    .deserializer(new FlinkCdcDataDeserializationSchema()) // converts SourceRecord to JSON String
                    .debeziumProperties(properties)
                    .build();
    
    env.setParallelism(2);
    
    DataStreamSource<String> stringDataStreamSource = env.addSource(build);
    
    stringDataStreamSource.addSink(new CustomSink());
    
    env.execute();
    
    
    2023-03-17 07:36:52
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理