MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: MogDB/openGauss 实时同步工具 —— MDB (MogDB Data Bridge)

架构图

5e84632f3893225f387324583d2a7121_20211028-54618ca6-bf75-4a5b-be66-8089957ba034.png

一、 配置安装MogDB和wal2json 插件

二、 启动KAFKA

 cd ./kafka/bin
 [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
 [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

三、 编辑配置文件

cat og.json

{ "name": "mtk-connector",    // The name of the connector when registered with a Kafka Connect service.
  "config": {
    "schema.include.list": "lee", // sycn schema 
    "plugin.name": "wal2json",     // plugin 
    "database.password": "Enmo@123", 
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address
    "database.hostname": "192.168.1.11", // The address of the PostgreSQL server.
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class.
    "database.user": "lee",  // The name of the PostgreSQL user that has the required privileges.
    "slot.name": "wal2json1", // slot unique
    "snapshot.mode": "never",
    "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
    "database.port": "18000", // The port number of the PostgreSQL server.
    "tasks.max": "1",
    "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
    "database.dbname": "postgres"  // The name of the PostgreSQL database to connect to
  }
} 

四、 启动 DEBEZIUM 并且注册

1. 修改配置文件插件路径

cat /root/kafka/config/connect-distributed.properties
plugin.path=/root/kafka/connect

2. 启动DBZ程序

[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

3. 注册DBZ信息

curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka

4. 消费测试

  • 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og

5. 其他

  • 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
  • 删除slot
删除slot
postgres=# select slot_name from pg_get_replication_slots();
         slot_name         
---------------------------
 standby_192.168.1.12_26001 
 primary_192.168.1.12_26001 
 wal2json1                 
(3 rows)
postgres=# select * from pg_drop_replication_slot('wal2json1');
  • 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
  • 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
  • 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092  --topic OPENGAUSS.lee.test_og

五、配置Datawork同步

1. 配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME

2. 编辑 dataworker.properties

mtk.topicPattern=og // 无需修改
mtk.groupId=worker-oracle //不同进程不一样
mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址
mtk.maxPollRecordsConfig=10000 最大拉取数量
mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小
mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123
mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123
mtk.includeSchema=lee
mtk.includeTable=lee.test
  • target 支持PostgreSQL MySQL Oracle openGauss MogDB

3. 启动dataworker

java -jar mtk-data-worker.jar

六、 同步程序移植到其他Kafka(可选)

1. 拷贝整个connect文件夹和配置文件

  • connect 文件夹
  • config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/root/datawork/kafka/connect

七、消费到其他kafka(可选)

  • 修改DBZ配置文件
[root@v-21-47-30-225 config]#  grep -i bootstrap.servers  /root/kafka/config/connect-distributed.properties
# the `bootstrap.servers` and those specifying replication factors.
bootstrap.servers=192.168.1.1:9092
  • 注册信息
    cat og.json.kafka
{ "name": "og",
  "config": {
    "schema.include.list": "lee",
    "plugin.name": "wal2json",
    "database.password": "Enmo@123",
    "database.history.kafka.bootstrap.servers": "192.168.1.1:9092",
    "database.hostname": "192.168.1.11",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "lee",
    "slot.name": "wal2json1",
    "snapshot.mode": "never",
    "database.server.name": "OPENGAUSS",
    "database.port": "26000",
    "tasks.max": "1",
    "table.include.list": "lee.test_OG",
    "database.dbname": "test"
  }
}

八、 部分日志截图

  • 同步日志
  • 5fb1f30a4f352be891d4cc4eda8fd3bf_20211028-048c91f9-d8e3-4581-9ea7-08dd29928357.png
目录
相关文章
|
18天前
|
Web App开发 人工智能 自然语言处理
快速掌握Dify+Chrome MCP:打造网页操控AI助手
本文教你如何快速搭建一个能操作浏览器的AI助手:通过Dify和Chrome MCP结合,只需三步配置,即可实现自动填表、数据抓取和网页操控,无需编写代码,用自然语言就能指挥AI完成各类网页任务。
|
7月前
|
监控 安全 Ubuntu
Ubuntu(22.04)云主机SSH安全加固
通过上述步骤,你可以有效地加固Ubuntu 22.04云主机的SSH安全性。这些措施不仅能防止常见的攻击,还能提升整体服务器的安全性和稳定性。建议在实施这些安全加固措施后,定期检查系统日志和更新安全策略,以应对不断变化的安全威胁。
362 14
|
12月前
|
运维 关系型数据库 Java
DataKit6.0将MySQL8.0迁移至openGauss6.0
DataKit6.0将MySQL8.0迁移至openGauss6.0
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
3515 3
|
12月前
|
存储 算法 搜索推荐
数据结构--堆的深度解析
数据结构--堆的深度解析
|
监控 数据挖掘 数据安全/隐私保护
ERP系统中的资产管理与维护
【7月更文挑战第25天】 ERP系统中的资产管理与维护
454 2
|
Oracle 安全 关系型数据库
如何在openGauss/PostgreSQL手动清理XLOG/WAL 文件?
openGauss/PostgreSQL中的预写式日志WAL(Write Ahead Log),又名Xlog或redo log,相当于oracle的online redo log, 不同的是oracle online redo log是提前创建几组滚动使用,但在opengauss中只需要本配置参数控制WAL日志的周期,数据库会一直的创建并自动清理,但存在一些情况WAL日志未清理导致目录空间耗尽,或目录空间紧张时手动删除wal日志时,比如如何确认在非归档模式下哪些WAL日志文件可以安全删除?
1473 0
|
消息中间件 SQL JSON
Debezium Adapt openGauss
Debezium Adapt openGauss
394 0
|
缓存 NoSQL Java
Redisson—分布式集合
Redisson—分布式集合
260 1