下载地址:https://github.com/ververica/flink-cdc-connectors/releases
这里下载2.2.0版本:https://github.com/ververica/flink-cdc-connectors/archive/refs/tags/release-
下载完成后,在 pom.xml 中找到这一项:flink.version ,修改 flink 版本号为:
<flink.version>1.13.6</flink.version>
自行打包编译
通过flink-cdc 同步mysql数据
1、flink集群准备
wget http://mirrors.cloud.tencent.com/apache/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz tar zxvf flink-1.13.6-bin-scala_2.11.tgz
将打包好的 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 包放入到flink的lib目录下
启动集群
cd flink-1.13.6 bin/start-cluster.sh
2、mysql环境准备
CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire");
3、启动flinksql client
cd /opt/flink-1.13.6 bin/sql-client.sh
4、在flinksql client中执行命令
Flink SQL> SET execution.checkpointing.interval = 3s Flink SQL> CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '自己的ip地址', 'port' = '3306', 'username' = 'root', 'password' = '密码', 'database-name' = 'mydb', 'table-name' = 'products' ); Flink SQL> select * from products;
5、在 MySQL 客户端继续插入数据
INSERT INTO products VALUES (default,"scooter1","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter3","Small 2-wheel scooter"); INSERT INTO products VALUES (default,"scooter4","Small 2-wheel scooter");
4、在flinksql client中查看数据
select * from products;
可以查看到数据变化