开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink cdc 集群的mysql要怎么写呢?

flink cdc 集群的mysql要怎么写呢?mysql集群存储节点:
10.85.25.201 3213-3216
10.85.25.202 3213-3216
10.85.25.203 3213-3216
10.85.25.204 3213-3216

WITH (
'connector' = 'mysql-cdc',
'hostname' = '??',
'port' = '??',
'username' = 'test',
'password' = 'test',
'database-name' = 'SETL_DB',
'table-name' = 'acct');

展开
收起
真的很搞笑 2024-01-23 15:49:09 66 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink CDC集群中,要配置MySQL集群存储节点,需要将hostnameport参数设置为相应的MySQL节点地址和端口。以下是配置示例:

    WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '10.85.25.201',
      'port' = '3213',
      'username' = 'test',
      'password' = 'test',
      'database-name' = 'SETL_DB',
      'table-name' = 'acct'
    )
    

    请根据实际情况替换hostnameport的值。

    2024-01-23 22:36:05
    赞同 展开评论 打赏
  • 在Flink CDC中,由于MySQL集群存储节点分布在多个主机和端口上,你不能直接指定一个hostname和port去连接整个集群。Flink CDC的MySQL-CDC连接器目前不直接支持连接MySQL集群,但你可以为集群中的每个MySQL实例配置一个单独的source,并通过debezium引擎采集binlog变化,然后将各个source产生的数据流合并到一起。

    为了连接到这样一个MySQL集群,你需要分别为集群中的每个MySQL节点创建一个Flink CDC Source,并配置好相应的hostname和port。例如,为四个节点分别配置如下:

    -- Node 1
    CREATE CATALOG mysql_catalog_1 WITH (
      'type' = 'jdbc',
      'connector' = 'debezium-mysql-cdc',
      'hostname' = '10.85.25.201',
      'port' = '3306', -- 使用实际的MySQL端口,这里假设是3306
      'username' = 'test',
      'password' = 'test',
      'database-name' = 'SETL_DB',
      'table-name' = 'acct'
    );
    
    -- Node 2
    CREATE CATALOG mysql_catalog_2 WITH (
      'type' = 'jdbc',
      'connector' = 'debezium-mysql-cdc',
      'hostname' = '10.85.25.202',
      'port' = '3306',
      'username' = 'test',
      'password' = 'test',
      'database-name' = 'SETL_DB',
      'table-name' = 'acct'
    );
    
    -- Node 3
    CREATE CATALOG mysql_catalog_3 WITH (
      'type' = 'jdbc',
      'connector' = 'debezium-mysql-cdc',
      'hostname' = '10.85.25.203',
      'port' = '3306',
      'username' = 'test',
      'password' = 'test',
      'database-name' = 'SETL_DB',
      'table-name' = 'acct'
    );
    
    -- Node 4
    CREATE CATALOG mysql_catalog_4 WITH (
      'type' = 'jdbc',
      'connector' = 'debezium-mysql-cdc',
      'hostname' = '10.85.25.204',
      'port' = '3306',
      'username' = 'test',
      'password' = 'test',
      'database-name' = 'SETL_DB',
      'table-name' = 'acct'
    );
    
    -- 创建Flink流处理作业,分别从四个catalog读取数据,然后联合处理或合并流
    -- 例如,假设你有两个SourceFunctions读取数据
    -- 并通过union操作符合并成一个DataStream
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    tEnv.executeSql("CREATE TEMPORARY TABLE mysql_table_1 WITH ('connector' = 'mysql_catalog_1')");
    tEnv.executeSql("CREATE TEMPORARY TABLE mysql_table_2 WITH ('connector' = 'mysql_catalog_2')");
    tEnv.executeSql("CREATE TEMPORARY TABLE mysql_table_3 WITH ('connector' = 'mysql_catalog_3')");
    tEnv.executeSql("CREATE TEMPORARY TABLE mysql_table_4 WITH ('connector' = 'mysql_catalog_4')");
    
    // 合并数据流
    Table result = tEnv.from("mysql_table_1")
                    .union(tEnv.from("mysql_table_2"))
                    .union(tEnv.from("mysql_table_3"))
                    .union(tEnv.from("mysql_table_4"));
    
    // 然后可以继续执行其他SQL操作或进行下游处理
    
    2024-01-23 16:39:54
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像