大佬们 FlinkSQL 能写入Oracle吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
可以,FlinkSQL任务主要由两部分组成,一是数据源创建,就是数据从哪儿来;二是目标结果表,数据写到哪儿去。使用FlinkSQL将对应的结果写入Oracle。
// 数据源表
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" ID STRING,\n" +
" CRON STRING,\n" +
" primary key (ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.1.1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'test',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")";
// 输出目标表
String sinkDDL =
"CREATE TABLE test_cdc_sink (\n" +
" ID STRING,\n" +
" CRON STRING,\n" +
" primary key (ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver' = 'com.oracle.cj.jdbc.Driver',\n" +
" 'url' = 'jdbc:oracle:thin:@192.168.1.2:1521:dis?serverTimezone=UTC&useSSL=false',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'table-name' = 'test'\n" +
")";
// 简单的聚合处理
String transformDmlSQL = "insert into test_cdc_sink select * from mysql_binlog";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
tableEnv.executeSql(transformDmlSQL);
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。