架构图
一、 配置安装MogDB和wal2json 插件
- wal2json安装配置参考链接 Debezium Adapt openGauss (五、 openGauss install wal2json)
- wal2json 下载地址
- 数据库安装
二、 启动KAFKA
cd ./kafka/bin [root@mogdb-mdb-0003 bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties [root@mogdb-mdb-0003 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
三、 编辑配置文件
cat og.json
{ "name": "mtk-connector", // The name of the connector when registered with a Kafka Connect service. "config": { "schema.include.list": "lee", // sycn schema "plugin.name": "wal2json", // plugin "database.password": "Enmo@123", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", // kafka address "database.hostname": "192.168.1.11", // The address of the PostgreSQL server. "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // The name of this PostgreSQL connector class. "database.user": "lee", // The name of the PostgreSQL user that has the required privileges. "slot.name": "wal2json1", // slot unique "snapshot.mode": "never", "database.server.name": "opengauss", // The logical name of the MogDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used. "database.port": "18000", // The port number of the PostgreSQL server. "tasks.max": "1", "table.include.list": "lee.test", // A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring. "database.dbname": "postgres" // The name of the PostgreSQL database to connect to } }
四、 启动 DEBEZIUM 并且注册
1. 修改配置文件插件路径
cat /root/kafka/config/connect-distributed.properties plugin.path=/root/kafka/connect
2. 启动DBZ程序
[root@mogdb-mdb-0003 bin]# ./connect-distributed.sh ../config/connect-distributed.properties
3. 注册DBZ信息
curl -X "POST" "http://21.47.30.225:8083/connectors" -H 'content-type: application/json;charset=UTF-8' -d @og.json.kafka
4. 消费测试
- 查看topic_name 源端要先插入一条数据
./kafka-topics.sh --bootstrap-server localhost:9092 --list
- 消费测试
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic OPENGAUSS.lee.test_og
5. 其他
- 删除DBZ信息
curl -X "DELETE" "http://localhost:8083/connectors/mtk-connector1"
- 删除slot
删除slot postgres=# select slot_name from pg_get_replication_slots(); slot_name --------------------------- standby_192.168.1.12_26001 primary_192.168.1.12_26001 wal2json1 (3 rows) postgres=# select * from pg_drop_replication_slot('wal2json1');
- 查看offset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group worker-oracle --describe
- 清除offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --group worker-oracle --topic opengauss5.lee.test --reset-offsets --to-earliest --execute
- 删除topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.1:9092 --topic OPENGAUSS.lee.test_og
五、配置Datawork同步
1. 配置JAVA环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME
2. 编辑 dataworker.properties
mtk.topicPattern=og // 无需修改 mtk.groupId=worker-oracle //不同进程不一样 mtk.bootstrapServer=127.0.0.1:9092 // kafka 地址 mtk.maxPollRecordsConfig=10000 最大拉取数量 mtk.maxPartitionFetchBytesConfig=104857600 最大拉取数据大小 mtk.source=MOGDB:192.168.1.11:18000:postgres:lee:Enmo@123 mtk.target=MYSQL:192.168.1.12:3306:root:root:Enmo@123 mtk.includeSchema=lee mtk.includeTable=lee.test
- target 支持PostgreSQL MySQL Oracle openGauss MogDB
3. 启动dataworker
java -jar mtk-data-worker.jar
六、 同步程序移植到其他Kafka(可选)
1. 拷贝整个connect文件夹和配置文件
- connect 文件夹
- config/connect-distributed.properties
task.shutdown.graceful.timeout.ms=10000 offset.flush.timeout.ms=5000 internal.key.converter=org.apache.kafka.connect.json.JsonConverter plugin.path=/root/datawork/kafka/connect
七、消费到其他kafka(可选)
- 修改DBZ配置文件
[root@v-21-47-30-225 config]# grep -i bootstrap.servers /root/kafka/config/connect-distributed.properties # the `bootstrap.servers` and those specifying replication factors. bootstrap.servers=192.168.1.1:9092
- 注册信息
cat og.json.kafka
{ "name": "og", "config": { "schema.include.list": "lee", "plugin.name": "wal2json", "database.password": "Enmo@123", "database.history.kafka.bootstrap.servers": "192.168.1.1:9092", "database.hostname": "192.168.1.11", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.user": "lee", "slot.name": "wal2json1", "snapshot.mode": "never", "database.server.name": "OPENGAUSS", "database.port": "26000", "tasks.max": "1", "table.include.list": "lee.test_OG", "database.dbname": "test" } }
八、 部分日志截图
- 同步日志