MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

架构图

5e84632f3893225f387324583d2a7121_20211028-54618ca6-bf75-4a5b-be66-8089957ba034.png

一、 配置安装MogDB和wal2json 插件

二、 启动KAFKA

 cd ./kafka/bin
 [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
 [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector",    // The name of the connector when registered with a Kafka Connect service.
  "config": {
    "schema.include.list": "lee", // sycn schema 
    "plugin.name": "wal2json",     // plugin 
    "database.password": "Enmo@123", 
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
    "database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
    "database.user": "lee",  // The name of the PostgreSQL user that has the required privileges.
    "slot.name": "wal2json1", // slot unique
    "snapshot.mode": "never",
    "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
    "database.port": "18000", // The port number of the PostgreSQL server.
    "tasks.max": "1",
    "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
    "database.dbname": "postgres"  // The name of the PostgreSQL database to connect to
  }
} 

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
  • 删除slot
删除slot
postgres=# select slot_name from pg_get_replication_slots();
         slot_name         
---------------------------
 standby_192.168.1.12_26001 
 primary_192.168.1.12_26001 
 wal2json1                 
(3 rows)
postgres=# select * from pg_drop_replication_slot('wal2json1');
  • 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
  • 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
  • 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092  --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test
  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件
[root@v-21-47-30-225 config]#  grep -i bootstrap.servers  /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092
  • 注册信息
    cat og.json.kafka
{ "name": "og",
  "config": {
    "schema.include.list": "lee",
    "plugin.name": "wal2json",
    "database.password": "Enmo@123",
    "database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
    "database.hostname": "192.168.1.11",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "lee",
    "slot.name": "wal2json1",
    "snapshot.mode": "never",
    "database.server.name": "OPENGAUSS",
    "database.port": "26000",
    "tasks.max": "1",
    "table.include.list": "lee.test_OG",
    "database.dbname": "test"
  }
}

八、 部分日志截图

  • 同步日志
  • 5fb1f30a4f352be891d4cc4eda8fd3bf_20211028-048c91f9-d8e3-4581-9ea7-08dd29928357.png
目录
相关文章
|
8月前
|
存储 NoSQL 数据库
clickhouse的BACKUP/RESTORE命令介绍
clickhouse的BACKUP/RESTORE命令介绍
366 0
|
关系型数据库 Unix Linux
Supported Platforms: MySQL Database MySQL安装支持的平台
Supported Platforms: MySQL Database  MySQL安装支持的平台 Supported Platforms: MySQL Database MySQL supports deployment in virtua...
1428 0
|
SQL 关系型数据库 MySQL