01 引言
如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件:
- 需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持);
- 需要开启SQL Server代理;
- 启用CDC功能。
ok,接下来开始讲解。
02 SQLServer安装
首先需要先安装SqlServer(使用的是2019版本),有兴趣的同学可以参考博主之前写的《Docker下安装SqlServer2019》。
主要就是两个步骤:
## 拉取最新镜像 docker pull mcr.microsoft.com/mssql/server:2019-latest ## 运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027) docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
03 开启SQLServer代理
首先使用root用户进入容器:
docker exec -it --user root sql_server_2019 bash
进入容器后,执行命令启用SqlServeragent:
/opt/mssql/bin/mssql-conf set sqlagent.enabled true
退出,并重启容器:
exit docker restart sql_server_2019
具体操作如下:
04 开启CDC功能
step1:创建’cdc_test’数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能:
-- 创建数据库 CREATE DATABASE cdc_test; -- 启用CDC功能 EXEC sys.sp_cdc_enable_db; -- 判断当前数据库是否启用了CDC(如果返回1,表示已启用) SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
step2:选择要进行 CDC 跟踪的表(这里使用orders表作为演示
)
-- 创建示例表(orders) CREATE TABLE orders ( id int, order_date date, purchaser int, quantity int, product_id int, PRIMARY KEY ([id]) ); -- schema_name 是表所属的架构(schema)的名称。 -- table_name 是要启用 CDC 跟踪的表的名称。 -- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = 'cdc_role';
执行结果如下:
step3:启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 相关的表中,您可以使用这些信息进行数据更改追踪和同步。
-- 查询在当前数据库下所有的表: SELECT * FROM INFORMATION_SCHEMA.TABLES
05 Flink SQL
ok,现在可以写FlinkSQL了,如下:
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据 CREATE TABLE t_source_sqlserver ( id INT, order_date DATE, purchaser INT, quantity INT, product_id INT, PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选) ) WITH ( 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器 'hostname' = '10.194.183.120', -- SQL Server主机名 'port' = '30027', -- SQL Server端口 'username' = 'sa', -- SQL Server用户名 'password' = 'abc@123456', -- SQL Server密码 'database-name' = 'cdc_test', -- 数据库名称 'schema-name' = 'dbo', -- 模式名称 'table-name' = 'orders' -- 要捕获更改的表名 ); -- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库 CREATE TABLE table_sink_mysql ( id INT, order_date DATE, purchaser INT, quantity INT, product_id INT, PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选) ) WITH ( 'connector' = 'jdbc', -- 使用JDBC连接器 'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL 'username' = 'root', -- MySQL用户名 'password' = 'root', -- MySQL密码 'table-name' = 'orders' -- 要写入的MySQL表名 ); -- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中 INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
启动程序,一切正常:
06 验证
验证新增:
验证修改:
验证删除: