MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

架构图

5e84632f3893225f387324583d2a7121_20211028-54618ca6-bf75-4a5b-be66-8089957ba034.png

一、 配置安装MogDB和wal2json 插件

二、 启动KAFKA

 cd ./kafka/bin
 [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
 [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector",    // The name of the connector when registered with a Kafka Connect service.
  "config": {
    "schema.include.list": "lee", // sycn schema 
    "plugin.name": "wal2json",     // plugin 
    "database.password": "Enmo@123", 
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
    "database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
    "database.user": "lee",  // The name of the PostgreSQL user that has the required privileges.
    "slot.name": "wal2json1", // slot unique
    "snapshot.mode": "never",
    "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
    "database.port": "18000", // The port number of the PostgreSQL server.
    "tasks.max": "1",
    "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
    "database.dbname": "postgres"  // The name of the PostgreSQL database to connect to
  }
} 

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
  • 删除slot
删除slot
postgres=# select slot_name from pg_get_replication_slots();
         slot_name         
---------------------------
 standby_192.168.1.12_26001 
 primary_192.168.1.12_26001 
 wal2json1                 
(3 rows)
postgres=# select * from pg_drop_replication_slot('wal2json1');
  • 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
  • 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
  • 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092  --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test
  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件
[root@v-21-47-30-225 config]#  grep -i bootstrap.servers  /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092
  • 注册信息
    cat og.json.kafka
{ "name": "og",
  "config": {
    "schema.include.list": "lee",
    "plugin.name": "wal2json",
    "database.password": "Enmo@123",
    "database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
    "database.hostname": "192.168.1.11",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "lee",
    "slot.name": "wal2json1",
    "snapshot.mode": "never",
    "database.server.name": "OPENGAUSS",
    "database.port": "26000",
    "tasks.max": "1",
    "table.include.list": "lee.test_OG",
    "database.dbname": "test"
  }
}

八、 部分日志截图

  • 同步日志
  • 5fb1f30a4f352be891d4cc4eda8fd3bf_20211028-048c91f9-d8e3-4581-9ea7-08dd29928357.png
目录
相关文章
|
关系型数据库 PostgreSQL Docker
harbor-db重启报错:initdb: error: directory "/var/lib/postgresql/da..
harbor-db重启报错:initdb: error: directory "/var/lib/postgresql/da..
487 1
|
6月前
|
存储 NoSQL 数据库
clickhouse的BACKUP/RESTORE命令介绍
clickhouse的BACKUP/RESTORE命令介绍
346 0
|
6月前
|
SQL 关系型数据库 MySQL
⑩⑨【Tool】MySQL常用客户端管理工具:mysql、mysqladmin、mysqlbinlog、mysqlshow、mysqldump、mysqlimport、source
⑩⑨【Tool】MySQL常用客户端管理工具:mysql、mysqladmin、mysqlbinlog、mysqlshow、mysqldump、mysqlimport、source
88 0
|
存储 运维 Oracle
关于MogDB我所知的一切(一)
关于MogDB我所知的一切
1186 0
关于MogDB我所知的一切(一)
|
SQL 存储 运维
关于MogDB我所知的一切(二)
关于MogDB我所知的一切
500 0
关于MogDB我所知的一切(二)
|
NoSQL MongoDB
MongoDB Database Tools 安装实现备份、恢复
MongoDB Database Tools 安装实现备份、恢复
411 0
|
存储 运维 Kubernetes
【OushuDB】Oushu Database和Apache HAWQ的不同
全新执行引擎,充分利用硬件的所有特性,比Apache HAWQ性能高出5-10倍支持Update和Delete,以及索引C++可插拔外部存储替换JAVA PXF,性能高数倍,无需安装部署PXF额外组件,极大简化了用户安装部署和运维原生支持CSV/TEXT外部存储可以用于不同集群之间共享数据,比如数据仓库和集市之间共享及传输数据可以用于高速数据加载和数据导出可以实现高速备份和恢复可以实现可插拔文件系统:比如S3, Ceph等可以实现可插拔文件格式:比如ORC,Parquet等支持ORC/TEXT/CSV作为内部表格式,支持ORC作为外部表格式 (通过C++可插拔外部存储)对PaaS/CaaS云平
298 0
【OushuDB】Oushu Database和Apache HAWQ的不同
|
Web App开发 SQL Java