关于Flink CDC,Debezium 监听orcl 怎么操作?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
flink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。
Properties properties = new Properties();
//设置数据库连接参数(表名大小写转换)
properties.setProperty("database.tablename.case.insensitive","false");
properties.setProperty("database.connection.adapter", "logminer");
//降低oracle cdc 延迟
properties.setProperty("log.mining.strategy", "online_catalog");
properties.setProperty("log.mining.continuous.mine", "true");
//创建Stream环境
Configuration configuration = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
//oracle 连接器配置
SourceFunction<String> build = OracleSource.<String>builder()
.hostname("localhost")
.port(1521)
.database("XE") // monitor XE database
.schemaList("test") // monitor test schema
.tableList("DBTEST.tbtest1," +
"DBTEST.TBTEST2") // monitor tables
.username("flinkuser")
.password("flinkpw")
//从最新位置读取,可自行修改initial()、latest()
.startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest())
.deserializer(new FlinkCdcDataDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();
env.setParallelism(2);
DataStreamSource<String> stringDataStreamSource = env.addSource(build);
stringDataStreamSource.addSink(new CustomSink());
env.execute();
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。