开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问 flink cdc 支持同时监听 同一个实例下 多个库吗?我想用DataStreamSourc

请问 flink cdc 支持同时监听 同一个实例下 多个库吗?我想用DataStreamSource方式 配置 同时监听多个实例 多个库 可以吗?

展开
收起
真的很搞笑 2023-07-13 11:16:48 306 0
4 条回答
写回答
取消 提交回答
  • 同问,楼主实现了吗,可以用一个DatastreamSource通过正则匹配监听多个库吗

    2023-11-21 09:37:53
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    是的,Flink CDC 支持同时监听同一个实例下的多个库。您可以在 scan.incremental 参数中指定多个库和表,并使用多个 cdc_source 函数创建多个数据源。

    以下是一个使用 scan.incremental 参数同时监听多个库和表的示例代码:

    python
    Copy
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import EnvironmentSettings, TableEnvironment

    from flinkcdc import cdc_source

    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = TableEnvironment.create(env_settings)

    创建 Flink CDC 的数据源

    source_ddl = """
    CREATE TABLE my_source_table (
    id INT,
    name STRING,
    age INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'password',
    'scan.incremental' = 'database1:table1,database2:table2'
    )
    """
    source_table = t_env.sql_query(source_ddl)
    source_stream_1 = cdc_source(source_table)

    source_ddl = """
    CREATE TABLE my_source_table (
    id INT,
    name STRING,
    age INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'password',
    'scan.incremental' = 'database3:table3,database4:table4'
    )
    """
    source_table = t_env.sql_query(source_ddl)
    source_stream_2 = cdc_source(source_table)

    将数据源连接起来

    source_stream_1.union(source_stream_2) \
    .key_by(lambda row: row.id) \
    .map(...) \
    .print()

    env.execute("My CDC Job")
    在上述示例中,scan.incremental 参数同时指定了多个库和表,用逗号分隔。您可以根据实际情况修改参数值,以匹配您的数据库配置。然后,使用多个 cdc_source 函数分别创建多个数据源,并将它们连接起来进行数据处理。image.png

    2023-07-30 06:54:53
    赞同 展开评论 打赏
  • 是的,Flink CDC 支持同时监听同一个实例下的多个库。您可以使用 DataStreamSource 方式来配置并同时监听多个实例和多个库。

    要配置 Flink CDC 监听多个实例和多个库,可以按照以下步骤进行操作:

    1. 创建 Flink 的 StreamExecutionEnvironment 对象:

       java    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    

    2. 创建 Flink 的 TableEnvironment 对象:

       java    EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);    

    3. 针对每个实例和库,创建相应的 CDC 数据源并注册为表:

       ```java    // 实例1,库1    CDCSource.Builder sourceBuilder1 = CDCSource.builder()        .connector("oracle-cdc-connector")  // 设置连接器类型,如 Oracle CDC Connector        .option("hostname", "instance1-hostname")        .option("port", "instance1-port")        .option("database-name", "instance1-dbname")        .option("username", "instance1-username")        .option("password", "instance1-password");

       TableSource tableSource1 = sourceBuilder1.build();    tEnv.registerTableSource("cdc_table1", tableSource1);

       // 实例2,库2    CDCSource.Builder sourceBuilder2 = CDCSource.builder()        .connector("oracle-cdc-connector")  // 设置连接器类型,如 Oracle CDC Connector        .option("hostname", "instance2-hostname")        .option("port", "instance2-port")        .option("database-name", "instance2-dbname")        .option("username", "instance2-username")        .option("password", "instance2-password");

       TableSource tableSource2 = sourceBuilder2.build();    tEnv.registerTableSource("cdc_table2", tableSource2);    ```

       在上述代码中,您需要根据实际情况替换相应的连接器类型和连接参数。

    4. 从注册的表创建 DataStream 对象:

       java    DataStream<Row> dataStream1 = tEnv.toAppendStream(tEnv.from("cdc_table1"), Row.class);    DataStream<Row> dataStream2 = tEnv.toAppendStream(tEnv.from("cdc_table2"), Row.class);    

       您可以进一步对这些 DataStream 进行处理、转换或写入其他 Sink。

    通过以上步骤,您可以配置 Flink CDC 来同时监听多个实例下的多个库。每个实例和库都可以独立地配置其连接器类型和连接参数,并注册为相应的表。

    2023-07-29 23:23:09
    赞同 展开评论 打赏
  • 肯定支持呀,DataStreamSource方式,库名.,库名.,此回答整理自钉群“Flink CDC 社区”

    2023-07-13 13:07:47
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载