大佬们,flink cdc msyql 到kafka,有好的案例或教程么?
当然有,以下是一个简单的 Flink CDC MySQL 到 Kafka 的示例:
在 Maven 中添加以下依赖项:
xml org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-formats-json ${flink.version} mysql mysql-connector-java 8.0.27 创建一个 Flink SQL Job,读取 MySQL CDC 数据并将其写入 Kafka。
java import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkCDCMySQLToKafka {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册 MySQL CDC 表
String ddl = "CREATE TABLE mysql_cdc (\n" +
" `id` INT,\n" +
" `name` STRING,\n" +
" `age` INT,\n" +
" `create_time` TIMESTAMP(3),\n" +
" `update_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp'\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'user',\n" +
" 'debezium.snapshot.locking.mode' = 'none',\n" +
" 'debezium.snapshot.mode' = 'initial',\n" +
" 'debezium.table.ignore-builtin' = 'true',\n" +
" 'debezium.transforms' = 'unwrap,addMetadata',\n" +
" 'debezium.transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',\n" +
" 'debezium.transforms.addMetadata.type' = 'org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema$MetadataTransform',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")";
tEnv.executeSql(ddl);
// 将 MySQL CDC 表数据写入 Kafka
String kafkaDDL = "CREATE TABLE kafka_sink (\n" +
" `id` INT,\n" +
" `name` STRING,\n" +
" `age` INT,\n" +
" `create_time` TIMESTAMP(3),\n" +
" `update_time` TIMESTAMP(3) METADATA VIRTUAL,\n" +
" WATERMARK FOR create_time AS create_time\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'flink_cdc_mysql_to_kafka',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'format' = 'json',\n" +
" 'sink.partitioner' = 'round-robin',\n" +
" 'sink.buffer-flush.max-rows' = '1'\n" +
")";
tEnv.executeSql(kafkaDDL);
// 执行查询并写入 Kafka
tEnv.executeSql("INSERT INTO kafka_sink SELECT * FROM mysql_cdc");在 MySQL 中创建一个测试表,插入一些数据。
sql CREATE TABLE user ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(20), age INT, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
INSERT INTO user (name, age) VALUES ('张三', 18); INSERT INTO user (name, age) VALUES ('李四', 20); 启动 Kafka 和 Flink 集群,并提交作业。
shell bin/kafka-server-start.sh config/server.properties
bin/start-cluster.sh
mvn exec:java -Dexec.mainClass=com.example.FlinkCDCMySQLToKafka 在 Kafka 中查看数据是否正确。
shell bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink_cdc_mysql_to_kafka --from-beginning 以上就是一个简单的 Flink CDC MySQL 到 Kafka 的示例。需要注意的是,在实际应用中,还需要考虑更多的问题,例如数据同步的并发性、事务性等等,以保证数据的正确性和完整性。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/