开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink cdc 整库同步MySQL 如何写入hudi,sink hudi的代码该如何写?

flink cdc 整库同步MySQL 如何写入hudi,sink hudi的代码该如何写?fca307e81aa3d19fba80344a09525f02.png

展开
收起
十一0204 2023-07-19 17:46:00 96 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,可以使用 Hudi 作为 Sink,来将 MySQL 数据库的变更数据写入到 Hudi 中。下面是一个简单的示例,展示如何编写 Flink CDC 应用程序,将 MySQL 数据库的整库数据同步到 Hudi 中。
    编写 Flink CDC 应用程序
    java
    Copy
    public class MySQLToHudiSync {

    public static void main(String[] args) throws Exception {
        // 创建 Flink 环境和表执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
        // 创建 MySQL CDC 数据源
        TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.INT()});
        CDCSource<MyRecord> cdcSource = MySQLSource.<MyRecord>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("my_db")
                .tableList("my_table")
                .username("root")
                .password("123456")
                .deserializer(new MyRecordDeserializationSchema(schema))
                .build();
        DataStreamSource<MyRecord> mysqlStream = env.addSource(cdcSource);
    
        // 将 MySQL 数据源转换为 Flink Table
        Table mysqlTable = tableEnv.fromDataStream(mysqlStream, "id, name, age");
    
        // 将 Flink Table 写入 Hudi
        Configuration hudiConf = new Configuration();
        hudiConf.set("hoodie.datasource.write.recordkey.field", "id");
        hudiConf.set("hoodie.datasource.write.partitionpath.field", "age");
        hudiConf.set("hoodie.datasource.write.table.name", "my_table");
        hudiConf.set("hoodie.datasource.hive_sync.enable", "true");
        hudiConf.set("hoodie.datasource.hive_sync.database", "my_db");
        hudiConf.set("hoodie.datasource.hive_sync.table", "my_table");
        hudiConf.set("hoodie.datasource.hive_sync.partition_fields", "age");
        hudiConf.set("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator");
        hudiConf.set("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload");
        hudiConf.set("hoodie.datasource.write.recordkey.prefix", "my_table");
        hudiConf.set("hoodie.datasource.hive_sync.jdbcurl", "jdbc:mysql://localhost:3306/my_db?useSSL=false");
    
        TableSink<Row> hudiSink = new HoodieTableSink(hudiConf);
        tableEnv.registerTableSink("hudi_table", new String[]{"id", "name", "age"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.INT()}, hudiSink);
        mysqlTable.insertInto("hudi_table");
    
        // 执行 Flink 应用程序
        env.execute("MySQLToHudiSync");
    }
    

    }
    在这个例子中,首先创建 MySQL CDC 数据源,然后将其转换为 Flink Table。接下来

    2023-07-29 20:00:06
    赞同 展开评论 打赏
  • 意中人就是我呀!

    看看自己mysql服务器所在的时间和时区。
    我们在东八区,用utc时间。此回答整理至钉群“Flink CDC 社区”。

    2023-07-19 18:47:49
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像