请教下各位大佬,FlinkCDC OceanBase connector,不支持两张cdc表的表?

问题1:请教下各位大佬,FlinkCDC OceanBase connector,不支持两张cdc表的表join么?select t1.id, t2.name FROM table1 t1 INNER JOIN table2 t2 ON t1.id = t2.id;

t1表和t2表都是cdc的表

我做这样大操作,就报错了2023-07-27 14:43:52,279 ERROR com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - LogProxyClient exception
com.oceanbase.clogproxy.client.exception.LogProxyClientException: LogProxy refused handshake request: code: 1
message: "Failed to create oblogreader"

at com.oceanbase.clogproxy.client.connection.ClientHandler.handleErrorResponse(ClientHandler.java:228) 但是单表的select 是正常的

问题2:这个要加上是么?image.png

展开
收起
真的很搞笑 2023-08-01 14:42:25 134 分享 版权
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    Flink CDC OceanBase Connector 支持两张 CDC 表的 Join 操作,您可以通过使用 Flink 的 Table API 或者 SQL API 实现。

    具体实现方式取决于您使用的 Flink 版本和 API 类型。以下是基于 Flink 1.14 版本的示例代码,仅供参考:

    使用 Table API 实现 Join 操作
    java
    Copy
    // 创建 TableEnvironment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    // 创建 Table1 和 Table2
    Table table1 = tEnv.from("cdc_table1");
    Table table2 = tEnv.from("cdc_table2");

    // 执行 Join 操作
    Table result = table1.join(table2).where("id = id").select("id, name");

    // 输出结果
    DataStream dataStream = tEnv.toAppendStream(result, Row.class);
    dataStream.print();
    使用 SQL API 实现 Join 操作
    java
    Copy
    // 创建 TableEnvironment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    // 注册 Table1 和 Table2
    tEnv.executeSql("CREATE TABLE table1 (id INT, name STRING) WITH (...)");
    tEnv.executeSql("CREATE TABLE table2 (id INT, age INT) WITH (...)");

    // 执行 Join 操作
    Table result = tEnv.sqlQuery("SELECT t1.id, t2

    2023-08-01 23:01:31
    赞同 展开评论
  • 回答1:join 走的是fink 的双流join,你是数据丢了还是语法错误?logproxy.client.id你设置了吗,每张表都不一样,此回答整理自钉群“Flink CDC 社区”

    2023-08-01 14:50:07
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理