开发者社区 > 云原生 > 云消息队列 > 正文

大佬们,flink cdc msyql 到kafka,有好的案例或教程么?

大佬们,flink cdc msyql 到kafka,有好的案例或教程么?

展开
收起
毛毛虫雨 2022-11-27 09:58:54 501 0
1 条回答
写回答
取消 提交回答
  • 全栈JAVA领域创作者

    当然有,以下是一个简单的 Flink CDC MySQL 到 Kafka 的示例:

    在 Maven 中添加以下依赖项:

    xml org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-formats-json ${flink.version} mysql mysql-connector-java 8.0.27 创建一个 Flink SQL Job,读取 MySQL CDC 数据并将其写入 Kafka。

    java import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

    public class FlinkCDCMySQLToKafka {

    public static void main(String[] args) throws Exception {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
        // 注册 MySQL CDC 表
        String ddl = "CREATE TABLE mysql_cdc (\n" +
                "  `id` INT,\n" +
                "  `name` STRING,\n" +
                "  `age` INT,\n" +
                "  `create_time` TIMESTAMP(3),\n" +
                "  `update_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = 'localhost',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'database-name' = 'test',\n" +
                "  'table-name' = 'user',\n" +
                "  'debezium.snapshot.locking.mode' = 'none',\n" +
                "  'debezium.snapshot.mode' = 'initial',\n" +
                "  'debezium.table.ignore-builtin' = 'true',\n" +
                "  'debezium.transforms' = 'unwrap,addMetadata',\n" +
                "  'debezium.transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',\n" +
                "  'debezium.transforms.addMetadata.type' = 'org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema$MetadataTransform',\n" +
                "  'format' = 'json',\n" +
                "  'scan.startup.mode' = 'earliest-offset'\n" +
                ")";
        tEnv.executeSql(ddl);
    
        // 将 MySQL CDC 表数据写入 Kafka
        String kafkaDDL = "CREATE TABLE kafka_sink (\n" +
                "  `id` INT,\n" +
                "  `name` STRING,\n" +
                "  `age` INT,\n" +
                "  `create_time` TIMESTAMP(3),\n" +
                "  `update_time` TIMESTAMP(3) METADATA VIRTUAL,\n" +
                "  WATERMARK FOR create_time AS create_time\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'flink_cdc_mysql_to_kafka',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'format' = 'json',\n" +
                "  'sink.partitioner' = 'round-robin',\n" +
                "  'sink.buffer-flush.max-rows' = '1'\n" +
                ")";
        tEnv.executeSql(kafkaDDL);
    
        // 执行查询并写入 Kafka
        tEnv.executeSql("INSERT INTO kafka_sink SELECT * FROM mysql_cdc");在 MySQL 中创建一个测试表,插入一些数据。
    

    sql CREATE TABLE user ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(20), age INT, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );

    INSERT INTO user (name, age) VALUES ('张三', 18); INSERT INTO user (name, age) VALUES ('李四', 20); 启动 Kafka 和 Flink 集群,并提交作业。

    shell bin/kafka-server-start.sh config/server.properties

    bin/start-cluster.sh

    mvn exec:java -Dexec.mainClass=com.example.FlinkCDCMySQLToKafka 在 Kafka 中查看数据是否正确。

    shell bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink_cdc_mysql_to_kafka --from-beginning 以上就是一个简单的 Flink CDC MySQL 到 Kafka 的示例。需要注意的是,在实际应用中,还需要考虑更多的问题,例如数据同步的并发性、事务性等等,以保证数据的正确性和完整性。

    2023-04-14 20:32:22
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 Kafka 版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载