MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

架构图

5e84632f3893225f387324583d2a7121_20211028-54618ca6-bf75-4a5b-be66-8089957ba034.png

一、 配置安装MogDB和wal2json 插件

二、 启动KAFKA

 cd ./kafka/bin
 [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
 [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector",    // The name of the connector when registered with a Kafka Connect service.
  "config": {
    "schema.include.list": "lee", // sycn schema 
    "plugin.name": "wal2json",     // plugin 
    "database.password": "Enmo@123", 
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
    "database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
    "database.user": "lee",  // The name of the PostgreSQL user that has the required privileges.
    "slot.name": "wal2json1", // slot unique
    "snapshot.mode": "never",
    "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
    "database.port": "18000", // The port number of the PostgreSQL server.
    "tasks.max": "1",
    "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
    "database.dbname": "postgres"  // The name of the PostgreSQL database to connect to
  }
} 

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
  • 删除slot
删除slot
postgres=# select slot_name from pg_get_replication_slots();
         slot_name         
---------------------------
 standby_192.168.1.12_26001 
 primary_192.168.1.12_26001 
 wal2json1                 
(3 rows)
postgres=# select * from pg_drop_replication_slot('wal2json1');
  • 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
  • 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
  • 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092  --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test
  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件
[root@v-21-47-30-225 config]#  grep -i bootstrap.servers  /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092
  • 注册信息
    cat og.json.kafka
{ "name": "og",
  "config": {
    "schema.include.list": "lee",
    "plugin.name": "wal2json",
    "database.password": "Enmo@123",
    "database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
    "database.hostname": "192.168.1.11",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "lee",
    "slot.name": "wal2json1",
    "snapshot.mode": "never",
    "database.server.name": "OPENGAUSS",
    "database.port": "26000",
    "tasks.max": "1",
    "table.include.list": "lee.test_OG",
    "database.dbname": "test"
  }
}

八、 部分日志截图

  • 同步日志
  • 5fb1f30a4f352be891d4cc4eda8fd3bf_20211028-048c91f9-d8e3-4581-9ea7-08dd29928357.png
目录
相关文章
|
存储 Oracle 关系型数据库
牛刀小试Oracle GoldenGate--OGG介绍(零)
1.Oracle GoldenGate 介绍 1.1 Oracle GoldenGate处理方法和支持的数据库 Oracle GoldenGate在多样和异构的基础IT平台中,可以在事务级别上进行数据交换和数据操作。
1825 0
|
7天前
|
关系型数据库 Java 数据库
【YashanDB 知识库】kettle 同步 PG 至崖山提示 no encryption pg_hba.conf 记录
【问题分类】数据导入导出 【关键字】数据同步,kettle,数据迁移,pg_hba.conf 【问题描述】使用 kettle 同步 postgresql 至崖山数据库时提示以下报错信息: 信息: New Caching Service registered 2024/10/10 17:00:21 - Pan - 开始运行. 2024/10/10 17:00:21 - public.T1 - 为了转换解除补丁开始 [public.T1] log4j:ERROR No output stream or file set for the appender named [pdi-execu
|
4月前
|
SQL 关系型数据库 MySQL
介绍5款 世界范围内比较广的 5款 mysql Database Management Tool
介绍5款 世界范围内比较广的 5款 mysql Database Management Tool
252 0
|
8月前
|
存储 关系型数据库 MySQL
【MySQL技术内幕】4.5-Named File Formats机制
【MySQL技术内幕】4.5-Named File Formats机制
69 2
|
9月前
|
SQL 关系型数据库 MySQL
⑩⑨【Tool】MySQL常用客户端管理工具:mysql、mysqladmin、mysqlbinlog、mysqlshow、mysqldump、mysqlimport、source
⑩⑨【Tool】MySQL常用客户端管理工具:mysql、mysqladmin、mysqlbinlog、mysqlshow、mysqldump、mysqlimport、source
128 0
|
Oracle 关系型数据库 数据库
MogDB/openGauss 生态工具-MTK(Migration ToolKit) 数据库迁移
MogDB/openGauss 生态工具-MTK(Migration ToolKit) 数据库迁移
386 0
MogDB/openGauss 生态工具-MTK(Migration ToolKit) 数据库迁移
|
Web App开发 SQL Java