问题1:请教下各位大佬,FlinkCDC OceanBase connector,不支持两张cdc表的表join么?select t1.id, t2.name FROM table1 t1 INNER JOIN table2 t2 ON t1.id = t2.id;
t1表和t2表都是cdc的表
我做这样大操作,就报错了2023-07-27 14:43:52,279 ERROR com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - LogProxyClient exception
com.oceanbase.clogproxy.client.exception.LogProxyClientException: LogProxy refused handshake request: code: 1
message: "Failed to create oblogreader"
at com.oceanbase.clogproxy.client.connection.ClientHandler.handleErrorResponse(ClientHandler.java:228) 但是单表的select 是正常的
问题2:这个要加上是么?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC OceanBase Connector 支持两张 CDC 表的 Join 操作,您可以通过使用 Flink 的 Table API 或者 SQL API 实现。
具体实现方式取决于您使用的 Flink 版本和 API 类型。以下是基于 Flink 1.14 版本的示例代码,仅供参考:
使用 Table API 实现 Join 操作
java
Copy
// 创建 TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 创建 Table1 和 Table2
Table table1 = tEnv.from("cdc_table1");
Table table2 = tEnv.from("cdc_table2");
// 执行 Join 操作
Table result = table1.join(table2).where("id = id").select("id, name");
// 输出结果
DataStream dataStream = tEnv.toAppendStream(result, Row.class);
dataStream.print();
使用 SQL API 实现 Join 操作
java
Copy
// 创建 TableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册 Table1 和 Table2
tEnv.executeSql("CREATE TABLE table1 (id INT, name STRING) WITH (...)");
tEnv.executeSql("CREATE TABLE table2 (id INT, age INT) WITH (...)");
// 执行 Join 操作
Table result = tEnv.sqlQuery("SELECT t1.id, t2
回答1:join 走的是fink 的双流join,你是数据丢了还是语法错误?logproxy.client.id你设置了吗,每张表都不一样,此回答整理自钉群“Flink CDC 社区”
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。