SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 本文主要介绍如何通过消息对接, kafkakafka-connect数据平台以及相关插件将数据同步到分析型数据库 AnalyticDB PostgreSQL

背景

SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。
开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。

CDC的使用条件

1.SQL server 2008及以上的企业版、开发版和评估版;
2.需要开启代理服务(作业)。
3.CDC需要业务库之外的额外的磁盘空间。
4.CDC的表需要主键或者唯一主键。
image
图1:Sqlserver CDC原理

ADB4PG Sink使用条件

  1. 需要提前使用建表语句,在ADB4PG端建表,系统不会自动创建(如果有需要可以加这部分功能)
  2. 每张表需要有主键或唯一主键
  3. 当前支持的数据格式:INTEGER,BIGINT,SMALLINT,NUMERIC,DECIMAL,REAL,DOUBLEPERICISION,BOOLEAN,DATE,TIMESTAMP,VARCHAR

环境准备

SQLServer环境准备

  1. 已有自建SQLServer或云上RDS实例(示例使用云上RDS SQLServer实例)
  2. 已有windows环境,并安装SSMS(SQL Server Management Studio),部分命令需要在SSMS执行

SQLServer环境建表

-- 创建源表
create database connect
GO
use connect
GO  

create table t1
(
    a int NOT NULL PRIMARY KEY,
    b BIGINT,
    c SMALLINT,
    d REAL,
    e FLOAT,
    f DATETIME,
    g VARCHAR
);



-- 开启db级的cdc
exec sp_rds_cdc_enable_db

-- 验证数据库是否开启cdc成功
select * from sys.databases where is_cdc_enabled = 1

-- 对源表开启cdc
exec sp_cdc_enable_table @source_schema='dbo', @source_name='t1', @role_name=null;
AI 代码解读

ADB4PG端创建目标表

CREATE DATABASE connect;

create table t1
(
    a int NOT NULL PRIMARY KEY,
    b BIGINT,
    c SMALLINT,
    d REAL,
    e FLOAT,
    f TIMESTAMP,
    g VARCHAR
);
AI 代码解读

Kafka环境准备

安装Kafka Server

1. 下载kafka安装包,并解压

SQL Server Source Connect目前只支持2.1.0及以上版本的Kafka Connect,故需要安装高版本kafka,实例使用kfakf-2.11-2.1.0。 http://kafka.apache.org/downloads?spm=a2c4g.11186623.2.19.7dd34587dwy89h#2.1.0

2. 编辑$KAFKA_HOME/config/server.properties

修改以下参数

...
## 为每台broker配置一个唯一的id号
broker.id=0

...

## log存储地址
log.dirs=/home/gaia/kafka_2.11-2.1.0/logs

## kafka集群使用的zk地址
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
...
AI 代码解读
3. 启动kafka server
bin/kafka-server-start.sh config/server.properties
AI 代码解读

安装Kafka Connect

1. 修改kafka connect配置文件

修改$KAFKA_HOME/config/connect-distributed.properties

## kafka server地址
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092

## 为kafka connector选定一个消费group id
group.id=

## 安装插件的地址,每次kafka connector启动时会动态加载改路径下的jar包,可以将每个插件单独放到一个子路径
plugin.path=
AI 代码解读

安装需要的kafka-connect插件

1. 将插件jar包放在我们在前面已经配置过的配置的plugin.path路径

sqlserver-source-connector

https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/?spm=a2c4g.11186623.2.18.7dd34587dwy89h

oss-sink-connector, 需要使用代码自行编译,注意在pom修改依赖的kafka及scala版本号

https://github.com/aliyun/kafka-connect-oss

adb4pg-jdbc-sink-connector,需要下载以下jar包及对应ADB For PG的JDBC驱动
https://yq.aliyun.com/attachment/download/?spm=a2c4e.11153940.0.0.70ed10daVH6ZQO&id=7282

2. 编辑配置文件
# CDC connector的配置文件 sqlserver-cdc-source.json
▽
{
    "name": "sqlserver-cdc-source",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.hostname" : "database hostname",
        "database.port" : "1433",
        "database.user" : "xxxx",
        "database.password" : "xxxxxx",
        "database.dbname" : "connect",
        "schemas.enable" : "false",
        "mode":"incrementing",
        "incrementing.column.name":"a",
        "database.history.kafka.bootstrap.servers" : "kafka-broker:9092",
        "database.history.kafka.topic": "server1.dbo.t1",
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter"
    }
}
AI 代码解读
# oss sink的配置文件 oss-sink.json
{
     "name":"oss-sink",
     "config": {
        "name":"oss-sink",
        "topics":"server1.dbo.testdata",
        "connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
        "format.class":"com.aliyun.oss.connect.kafka.format.json.JsonFormat",
        "flush.size":"1",
        "tasks.max":"4",
        "storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
        "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "timestamp.extractor":"Record",
        "oss.bucket":"traffic-csv",
        "partition.duration.ms":"10000",
        "path.format":"YYYY-MM-dd-HH",
        "locale":"US",
        "timezone":"Asia/Shanghai",
        "rotate.interval.ms":"30000"
        }
}
AI 代码解读

有关oss sinker更详尽的配置,见文档 https://github.com/aliyun/kafka-connect-oss

## adb4pg-jdbc-sink配置文件
{
     "name":"adb4pg-jdbc-sink",
     "config": {
        "name":"adb4pg-jdbc-sink",
        "topics":"server1.dbo.t1",
        "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
        "connection.url":"jdbc:postgresql://gp-8vb8xi62lohhh2777o.gpdb.zhangbei.rds.aliyuncs.com:3432/connect",
        "connection.user":"xxx",
        "connection.password":"xxxxxx",
        "col.names":"a,b,c,d,e,f,g",
        "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
        "pk.fields":"a",
        "target.tablename":"t1",
        "tasks.max":"1",
        "auto.create":"false",
        "table.name.format":"t1",
        "batch.size":"1"
        }
}
AI 代码解读

由于OSS sinker使用了hdfs封装的FileSystem,需要将OSS相关的信息维护到$KAFKA_HOME/config/core-site.xml文件中

<configuration>
    <property>
        <name>fs.oss.endpoint</name>
        <value>xxxxxxx</value>
    </property>
    <property>
        <name>fs.oss.accessKeyId</name>
        <value>xxxxxxx</value>
    </property>
    <property>
        <name>fs.oss.accessKeySecret</name>
        <value>xxxxxxx</value>
    </property>
    <property>
        <name>fs.oss.impl</name>
        <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
    </property>
    <property>
        <name>fs.oss.buffer.dir</name>
        <value>/tmp/oss</value>
    </property>
    <property>
        <name>fs.oss.connection.secure.enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>fs.oss.connection.maximum</name>
        <value>2048</value>
    </property>
</configuration>
AI 代码解读
3. 启动已经配置好的kafka-connector插件

启动及删除connect任务命令

## 启动命令
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-jdbc-sink.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @oss-sink.json

## 删除命令
curl -s -X DELETE http://localhost:8083/connectors/sqlserver-cdc-source
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-jdbc-sink
curl -s -X DELETE http://localhost:8083/connectors/oss-sink
AI 代码解读

在ADB For PG获取更新数据

SQLServer插入赠/更/删数据记录

insert into t1(a,b,c,d,e,f,g) values(1, 2, 3, 4, 5, convert(datetime,'24-12-19 10:34:09 PM',5), 'h');
AI 代码解读

在kafka topic获取更新结果

先确认是否生成了kafka-connect所需的topic信息

bin/kafka-topics.sh --zookeeper zk_address --list
AI 代码解读

image
如截图,connect-configs, connect-offsets, connect-status为kafka-connect用来存储任务数据更新状态的topic。schema-changes-inventory是维护sqlserver表结构的topic。
可以通过kafka consloe-consumer上获取到的topic信息,以确认cdc数据正确被采集到kafka topic

bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 --topic server1.dbo.t1
AI 代码解读

在ADB For PG上查询同步过来的数据

注意:因为是不同数据库之间的同步,时区设置的不同可能会导致同步结果产生时区偏移,需要在两侧数据库做好设置。
image

在OSS查看更新的数据

image

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
打赏
0
1
0
0
39
分享
相关文章
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
303 0
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
287 9
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
624 2
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
723 1
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版产品使用问题之在从SQLServer捕获数据变更(CDC)时,开启CDC功能后对应的表中没有记录变化数据,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之从MySQL数据库中捕获变更数据并进行实时处理如何按天分表同步CDC数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之SQL Server CDC是否支持抽取SQL Server视图
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
docker拉取MySQL后数据库连接失败解决方案
通过以上方法,可以解决Docker中拉取MySQL镜像后数据库连接失败的常见问题。关键步骤包括确保容器正确启动、配置正确的环境变量、合理设置网络和权限,以及检查主机防火墙设置等。通过逐步排查,可以快速定位并解决连接问题,确保MySQL服务的正常使用。
115 82

相关产品

  • 云数据库 RDS PostgreSQL 版