大佬们,flink cdc msyql 到kafka,有好的案例或教程么?

大佬们,flink cdc msyql 到kafka,有好的案例或教程么?

展开
收起
毛毛虫雨 2022-11-27 09:58:54 553 分享 版权
1 条回答
写回答
取消 提交回答
  • 全栈JAVA领域创作者

    当然有,以下是一个简单的 Flink CDC MySQL 到 Kafka 的示例:

    在 Maven 中添加以下依赖项:

    xml org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-formats-json ${flink.version} mysql mysql-connector-java 8.0.27 创建一个 Flink SQL Job,读取 MySQL CDC 数据并将其写入 Kafka。

    java import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

    public class FlinkCDCMySQLToKafka {

    public static void main(String[] args) throws Exception {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
        // 注册 MySQL CDC 表
        String ddl = "CREATE TABLE mysql_cdc (\n" +
                "  `id` INT,\n" +
                "  `name` STRING,\n" +
                "  `age` INT,\n" +
                "  `create_time` TIMESTAMP(3),\n" +
                "  `update_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = 'localhost',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'database-name' = 'test',\n" +
                "  'table-name' = 'user',\n" +
                "  'debezium.snapshot.locking.mode' = 'none',\n" +
                "  'debezium.snapshot.mode' = 'initial',\n" +
                "  'debezium.table.ignore-builtin' = 'true',\n" +
                "  'debezium.transforms' = 'unwrap,addMetadata',\n" +
                "  'debezium.transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',\n" +
                "  'debezium.transforms.addMetadata.type' = 'org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema$MetadataTransform',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'earliest-offset'\n" +
                ")";
        tEnv.executeSql(ddl);
    
        // 将 MySQL CDC 表数据写入 Kafka
        String kafkaDDL = "CREATE TABLE kafka_sink (\n" +
                "  `id` INT,\n" +
                "  `name` STRING,\n" +
                "  `age` INT,\n" +
                "  `create_time` TIMESTAMP(3),\n" +
                "  `update_time` TIMESTAMP(3) METADATA VIRTUAL,\n" +
                "  WATERMARK FOR create_time AS create_time\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'flink_cdc_mysql_to_kafka',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'format' = 'json',\n" +
                "  'sink.partitioner' = 'round-robin',\n" +
                "  'sink.buffer-flush.max-rows' = '1'\n" +
                ")";
        tEnv.executeSql(kafkaDDL);
    
        // 执行查询并写入 Kafka
        tEnv.executeSql("INSERT INTO kafka_sink SELECT * FROM mysql_cdc");在 MySQL 中创建一个测试表,插入一些数据。
    

    sql CREATE TABLE user ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(20), age INT, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );

    INSERT INTO user (name, age) VALUES ('张三', 18); INSERT INTO user (name, age) VALUES ('李四', 20); 启动 Kafka 和 Flink 集群,并提交作业。

    shell bin/kafka-server-start.sh config/server.properties

    bin/start-cluster.sh

    mvn exec:java -Dexec.mainClass=com.example.FlinkCDCMySQLToKafka 在 Kafka 中查看数据是否正确。

    shell bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink_cdc_mysql_to_kafka --from-beginning 以上就是一个简单的 Flink CDC MySQL 到 Kafka 的示例。需要注意的是,在实际应用中,还需要考虑更多的问题,例如数据同步的并发性、事务性等等,以保证数据的正确性和完整性。

    2023-04-14 20:32:22
    赞同 展开评论

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系列产品 Serverless 化。RocketMQ 中文社区:https://rocketmq-learning.com/

收录在圈子:
+ 订阅
阿里云RPA历经8年的内部验证,覆盖了阿里巴巴大部分BU,实现了电商客服、新零售等新兴行业的渗透,并且已经完成在保险、金融、医疗保健等领域的场景深耕,联合合作伙伴具备深度定制化能力和稳定交付能力,积累了丰富的行业可行性解决方案。目前阿里云RPA能集成并运行在更高的软件层级,这就决定了它不会侵入、影响已有的软件系统。在帮助企业提升效能的过程中,保持企业已有的IT系统功能平稳、运行可靠。
还有其他疑问?
咨询AI助理