MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

架构图

5e84632f3893225f387324583d2a7121_20211028-54618ca6-bf75-4a5b-be66-8089957ba034.png

一、 配置安装MogDB和wal2json 插件

二、 启动KAFKA

 cd ./kafka/bin
 [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
 [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector",    // The name of the connector when registered with a Kafka Connect service.
  "config": {
    "schema.include.list": "lee", // sycn schema 
    "plugin.name": "wal2json",     // plugin 
    "database.password": "Enmo@123", 
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
    "database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
    "database.user": "lee",  // The name of the PostgreSQL user that has the required privileges.
    "slot.name": "wal2json1", // slot unique
    "snapshot.mode": "never",
    "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
    "database.port": "18000", // The port number of the PostgreSQL server.
    "tasks.max": "1",
    "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
    "database.dbname": "postgres"  // The name of the PostgreSQL database to connect to
  }
} 

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
  • 删除slot
删除slot
postgres=# select slot_name from pg_get_replication_slots();
         slot_name         
---------------------------
 standby_192.168.1.12_26001 
 primary_192.168.1.12_26001 
 wal2json1                 
(3 rows)
postgres=# select * from pg_drop_replication_slot('wal2json1');
  • 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
  • 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
  • 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092  --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test
  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件
[root@v-21-47-30-225 config]#  grep -i bootstrap.servers  /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092
  • 注册信息
    cat og.json.kafka
{ "name": "og",
  "config": {
    "schema.include.list": "lee",
    "plugin.name": "wal2json",
    "database.password": "Enmo@123",
    "database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
    "database.hostname": "192.168.1.11",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "lee",
    "slot.name": "wal2json1",
    "snapshot.mode": "never",
    "database.server.name": "OPENGAUSS",
    "database.port": "26000",
    "tasks.max": "1",
    "table.include.list": "lee.test_OG",
    "database.dbname": "test"
  }
}

八、 部分日志截图

  • 同步日志
  • 5fb1f30a4f352be891d4cc4eda8fd3bf_20211028-048c91f9-d8e3-4581-9ea7-08dd29928357.png
目录
相关文章
|
Linux Android开发
CentOS7下安装Emby流媒体服务器
CentOS7下安装Emby流媒体服务器
3451 0
CentOS7下安装Emby流媒体服务器
|
JSON NoSQL MongoDB
MongoDB Compass的安装及使用图文说明(非常详细)
MongoDB Compass的安装及使用图文说明(非常详细)
4485 2
|
运维 关系型数据库 Java
DataKit6.0将MySQL8.0迁移至openGauss6.0
DataKit6.0将MySQL8.0迁移至openGauss6.0
|
存储 NoSQL 大数据
大数据中数据存储 (Data Storage)
【10月更文挑战第17天】
1547 2
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
3915 3
|
Oracle 安全 关系型数据库
如何在openGauss/PostgreSQL手动清理XLOG/WAL 文件?
openGauss/PostgreSQL中的预写式日志WAL(Write Ahead Log),又名Xlog或redo log,相当于oracle的online redo log, 不同的是oracle online redo log是提前创建几组滚动使用,但在opengauss中只需要本配置参数控制WAL日志的周期,数据库会一直的创建并自动清理,但存在一些情况WAL日志未清理导致目录空间耗尽,或目录空间紧张时手动删除wal日志时,比如如何确认在非归档模式下哪些WAL日志文件可以安全删除?
1596 0
|
监控 Cloud Native 测试技术
持续集成与持续交付(CI/CD)在云原生环境中的应用与优化
传统软件开发模式下的集成和交付流程往往繁琐且易出错,而随着云原生技术的快速发展,持续集成与持续交付(CI/CD)在云原生环境中的应用变得尤为重要。本文将探讨CI/CD在云原生环境中的应用及优化策略,包括自动化测试、容器化部署以及监控和反馈机制等方面,旨在帮助开发团队更好地应对云原生时代的挑战。
318 2
|
消息中间件 SQL JSON
Debezium Adapt openGauss
Debezium Adapt openGauss
431 0
|
存储 SQL JSON
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)