开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinkcdc 支持sink写入oceanbase数据库吗?

flinkcdc 支持sink写入oceanbase数据库吗?

展开
收起
cuicuicuic 2023-11-15 08:18:43 59 0
4 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    是的,Flink CDC 支持将数据写入 OceanBase 数据库。您可以使用 Flink SQL 的 CREATE TABLE 语句来定义一个 OceanBase 表,并将其作为 Flink CDC 的目标表。
    以下是一个示例:

    CREATE TABLE oceanbase_table (
        id INT,
        name STRING,
        age INT
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:oracle:thin:@localhost:1521:xe',
        'table-name' = 'oceanbase_table'
    );
    

    在这个示例中,我们创建了一个名为 oceanbase_table 的 OceanBase 表,并将其用作 Flink CDC 的目标表。您需要根据实际情况替换 urltable-name 参数的值。

    2023-11-15 22:31:26
    赞同 展开评论 打赏
  • 目前开源还不支持,阿里云那边支持,此回答整理自钉群“Flink CDC 社区”

    2023-11-15 12:32:50
    赞同 展开评论 打赏
  • Apache Flink的CDC (Change Data Capture) 目前主要支持以下几种类型的数据库:

    1. MySQL
    2. PostgreSQL
    3. Oracle
    4. DB2
    5. MongoDB
    6. HBase
    7. HDFS

    对于OceanBase,目前Flink CDC并不直接支持。但是,你可以通过其他方式来实现从OceanBase中捕获数据的变化。例如,你可以使用OceanBase的SDK来获取数据的变化,然后将这些变化应用到你的Flink job中。

    2023-11-15 09:39:46
    赞同 展开评论 打赏
  • 支持的。使用示例https://help.aliyun.com/zh/flink/developer-reference/oceanbase-connector?spm=a2c4g.11186623.0.i19

    维表``CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING,proctime` AS PROCTIME()
    ) WITH (
    'connector' = 'datagen'
    );

    CREATE TEMPORARY TABLE oceanbase_dim (
    a INT,
    b VARCHAR,
    c VARCHAR
    ) WITH (
    'connector' = 'oceanbase',
    'url' = '',
    'userName' = '',
    'password' = '',
    'tableName' = ''
    );

    CREATE TEMPORARY TABLE blackhole_sink(
    a INT,
    b STRING
    ) WITH (
    'connector' = 'blackhole'
    );

    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.proctime AS H
    ON T.a = H.a;

    
    结果表(oceanbase)
    ```CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_ocj_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'oceanbase-ocj',
      'url' = '<yourConfigUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'databaseName' = '<yourDatabaseName>',
      'tableName' = '<yourTableName>'
    );
    
    INSERT INTO oceanbase_ocj_sink
    SELECT * FROM datagen_source;
    
    2023-11-15 08:38:09
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
2022 DTCC-阿里云一站式数据库上云最佳实践 立即下载
云时代的数据库技术趋势 立即下载
超大型金融机构国产数据库全面迁移成功实践 立即下载