开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink-cdc3.0,设置整库同步只变更库名,表名和表结构都不变,需要如何设置?

我用的是flink-cdc3.0,mysql同步到doris,设置整库同步,只变更库名,表名和表结构都不变,需要如何设置?6ceb2df099020ed95e1b2334b8eef9f4.png

展开
收起
小小鹿鹿鹿 2024-01-18 00:52:13 153 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    要在Flink CDC中将MySQL同步到Doris,并设置整库同步,只变更库名、表名和表结构都不变,可以按照以下步骤进行配置:

    1. 首先,确保已经安装了Flink CDC的MySQL连接器(flink-connector-mysql)。

    2. 在Flink配置文件(如flink-conf.yaml)中,添加以下配置项:

    cdc.source.mysql.hostname: <MySQL服务器地址>
    cdc.source.mysql.port: <MySQL服务器端口>
    cdc.source.mysql.username: <MySQL用户名>
    cdc.source.mysql.password: <MySQL密码>
    cdc.source.mysql.database-name: <要同步的数据库名>
    cdc.source.mysql.table-includes: <要同步的表名,多个表名用逗号分隔>
    cdc.sink.doris.fenodes: <Doris集群的FE节点地址,多个地址用逗号分隔>
    cdc.sink.doris.query-mode: stream
    cdc.sink.doris.username: <Doris用户名>
    cdc.sink.doris.password: <Doris密码>
    cdc.sink.doris.database-name: <目标数据库名>
    cdc.sink.doris.table-name: <目标表名>
    cdc.sink.doris.column-mappings: <列映射关系,格式为"源列名=目标列名",多个映射用逗号分隔>
    cdc.sink.doris.batch-size: <批处理大小,单位为行数>
    cdc.sink.doris.max-retries: <最大重试次数>
    
    1. 在Flink程序中,创建一个DebeziumSourceFunction实例,用于从MySQL读取数据,并将其写入Doris。示例代码如下:
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.debezium.DebeziumSourceFunction;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class MySqlToDoris {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // 注册MySQL连接器
            env.registerConnector("mysql", new MySqlConnector());
    
            // 创建MySQL数据源
            DataStream<Row> mysqlSource = env.addSource(new DebeziumSourceFunction<>(
                    "mysql",
                    "<MySQL服务器地址>",
                    <MySQL服务器端口>,
                    "<MySQL用户名>",
                    "<MySQL密码>",
                    "<要同步的数据库名>",
                    "<要同步的表名,多个表名用逗号分隔>"));
    
            // 将MySQL数据源转换为Doris数据源
            DataStream<Row> dorisSink = mysqlSource
                    .map(row -> {
                        // 在这里对数据进行处理,例如修改列名等
                        return row;
                    })
                    .addSink(tableEnv.sink("<目标数据库名>", "<目标表名>"));
    
            env.execute("MySqlToDoris");
        }
    }
    
    1. 运行Flink程序,即可实现MySQL到Doris的整库同步。注意,如果需要变更库名、表名和表结构,可以在上述代码中的map操作中进行处理。
    2024-01-19 15:38:53
    赞同 展开评论 打赏
  • 目前不支持变更库名。参考:
    https://github.com/ververica/flink-cdc-connectors/pull/2908
    。此回答来自钉群Flink CDC 社区。

    2024-01-18 11:24:29
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载