一、MongoShake
1.1MongoShake简介
MongoShake是一个以go语言编写的通用的平台型服务,通过读取MongoDB集群的Oplog日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。
MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有tunnel通道类型有:
- Direct:直接写入目的MongoDB
- RPC:通过net/rpc方式连接
- TCP:通过tcp方式连接
- File:通过文件方式对接
- Kafka:通过Kafka方式对接
- Mock:用于测试,不写入tunnel,抛弃所有数据
消费者可以通过对接tunnel通道获取关注的数据,例如对接Direct通道直接写入目的MongoDB,或者对接RPC进行同步数据传输等。此外,用户还可以自己创建自己的API进行灵活接入。下面2张图给出了基本的架构和数据流。
MongoShake对接的源数据库支持单个mongod,replica set和sharding三种模式。目的数据库支持mongod和mongos。如果源端数据库为replica set,建议使用备库以减少主库的压力;如果为sharding模式,那么每个shard都将对接到MongoShake并进行并行抓取。对于目的库来说,可以对接多个mongos,不同的数据将会哈希后写入不同的mongos。
1.2 应用场景
- 从MongoDB副本集同步到MongoDB副本集
- 从MongoDB副本集同步到MongoDB集群版
- 从MongoDB集群版同步到MongoDB集群版
- 从MongoDB副本集同步到kafka通道
- 云上MongoDB副本集的双向同步(自建不支持,云MongoDB内核有调整:在oplog中加入uk字段,标识涉及到的唯一索引信息)
1.3 基本特性
- 并行复制
MongoShake提供了并行复制的能力,复制的粒度选项(shard_key)可以为:id,collection或者auto,不同的文档或表可能进入不同的哈希队列并发执行。id表示按文档进行哈希;collection表示按表哈希;auto表示自动配置,如果有表存在唯一键,则退化为collection,否则等价于id。
按表哈希可以保证一个表内的操作的顺序一致性,但不能保证不同表之间的顺序一致性;按文档哈希可以保证一个表内对同一个文档(主键_id)的操作的顺序一致性,但不能保证对不同文档操作的顺序一致性。
- HA方案
MongoShake定期将同步上下文进行存储,存储对象可以为第三方API或者源库。目前的上下文内容为“已经成功同步的oplog时间戳”。在这种情况下,当服务切换或者重启后,通过对接该API或者数据库,新服务能够继续提供服务。
此外,MongoShake还提供了Hypervisor机制用于在服务挂掉的时候,将服务重新拉起。
- 过滤
提供黑名单和白名单机制选择性同步db和collection。
- 压缩
支持oplog在发送前进行压缩,目前支持的压缩格式有gzip, zlib, 或deflate。
- Checkpoint
Checkpoint是用于标识同步的位点信息,比如checkpoint="2018-01-01 12:34"标识已经同步到了"2018-01-01 12:34"这个位点了,那么这个时候如果MongoShake异常退出,那么下次重启可以继续从"2018-01-01 12:34"开始拉取,而不是从头开始。下面介绍具体实现原理:
MongShake采用了ACK机制确保oplog成功回放,如果失败将会引发重传,传输重传的过程类似于TCP的滑动窗口机制。这主要是为了保证应用层可靠性而设计的,比如解压缩失败等等。为了更好的进行说明,先来定义几个名词:
LSN(Log Sequence Number),表示已经传输的最新的oplog序号。
LSN_ACK(Acked Log Sequence Number),表示已经收到ack确认的最大LSN,即写入tunnel成功的LSN。
LSN_CKPT(Checkpoint Log Sequence Number),表示已经做了checkpoint的LSN,即已经持久化的LSN。
LSN、LSN_ACK和LSN_CKPT的值均来自于Oplog的时间戳ts字段,其中隐含约束是:LSN_CKPT<=LSN_ACK<=LSN。
如上图所示,LSN=16表示已经传输了16条oplog,如果没有重传的话,下次将传输LSN=17;LSN_ACK=13表示前13条都已经收到确认,如果需要重传,最早将从LSN=14开始;LSN_CKPT=8表示已经持久化checkpoint=8。持久化的意义在于,如果此时MongoShake挂掉重启后,源数据库的oplog将从LSN_CKPT位置开始读取而不是从头LSN=1开始读。因为oplog DML的幂等性,同一数据多次传输不会产生问题。但对于DDL,重传可能会导致错误。
- 索引、DDL同步优化 & 4.0事务支持
从v1.5版本开始,MongoShake优化了DDL语句,保证了正确性。其基本原理是通过添加全局barrier,一旦发现oplog为DDL语句或者索引,那么会等待这条oplog成功写入目的端并更新checkpoint后,才会放开后续的同步。对于DML语句的同步,还是延用之前的并发模式。在以下非常极端的情况下,可能会存在报错,需要运维介入解决:目的端已经写入DDL但是checkpoint还没更新,这个时候MongoShake挂了,那么重启之后这条DDL重传写入将会导致报错。
同样从v1.5版本,MongoShake对事务语句进行了支持。
- 全量同步
从v1.5版本开始,MongoShake支持全量同步,有3种模式可选:全量同步+增量同步,只全量同步,只增量同步。为了保证高效性,内部同样采用并发处理。
- 排障和限速
MongoShake对外提供Restful API,提供实时查看进程内部各队列数据的同步情况,便于问题排查。另外,还提供限速功能,方便用户进行实时控制,减轻数据库压力。
二、方案
2.1 多活方案
在开源MongoDB下,可以根据控制流量分发来达到多活的需求。比如下面这个图,需要通过proxy进行流量分发,比如对a, b库的写操作分发到左边的MongoDB,对c库的写操作分发到右边的MongoDB,源库到目的库的MongoShake链路只同步a, b库(MongoShake提供按db、collection的过滤功能),目的库到源库的MongoShake链路只同步c库。这样就解决了环形复制的问题。
2.2 容灾方案
MongoShake搭建了异地容灾链路。用户在2个机房分别部署了2套应用,正常情况下,用户流量通过DNS/SLB只访问主应用,然后再访问到主MongoDB,数据通过MongoShake在2个机房的数据库之间进行同步,一旦机房1不可用,DNS/SLB将用户流量切换到备上,然后继续对外提供读写服务。
三、验证
3.1 环境介绍
本实验采用两套单节点副本集作为源端和一套单节点副本集做为目的端进行测试,使用的MongoShake版本为2.4.1。(多节点在配置上只需添加对应节点配置即可)
- MongoShake1:single1 test1>single3
- MongoShake1:single1 test2>single2
- MongoShake1back:single3 test1>single1
- MongoShake2back:single3 test2>single2
名称 | 版本 | 端口 |
---|---|---|
single1 | 4.2.3 | 27017 |
single2 | 4.2.3 | 27018 |
single3 | 4.2.3 | 27019 |
MongoShake1 | 2.4.1 | 9101、9102、9103 |
MongoShake2 | 2.4.1 | 9201、9202、9203 |
MongoShake1back | 2.4.1 | 9301、9302、9303 |
MongoShake2back | 2.4.1 | 9401、9402、9403 |
3.2 功能验证
3.2.1 多活验证
#single1配置文件
conf.version = 2
id = mongoshake1
master_quorum = false
full_sync.http_port = 9101
incr_sync.http_port = 9102
system_profile_port = 9103
log.level = info
log.dir =/usr/local/mongo-shake/single1
log.file = collector.log
log.flush = false
sync_mode = all
mongo_urls = mongodb://root:970125@127.0.0.1:27017
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27019
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test1
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckptone
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#single3配置文件
conf.version = 2
id = mongoshake1back
master_quorum = false
full_sync.http_port = 9301
incr_sync.http_port = 9302
system_profile_port = 9303
log.level = info
log.dir =/usr/local/mongo-shake/single1back
log.file = collector.log
log.flush = false
sync_mode = all
mongo_urls = mongodb://root:970125@127.0.0.1:27019
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27017
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test2
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckptoneback
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#开启双向同步任务
[root@zijie mongo-shake]# ./start.sh mongoshake1.conf
[root@zijie mongo-shake]# ./start.sh mongoshake1back.conf
#single1
single1:PRIMARY> use test1
switched to db test1
single1:PRIMARY> db.t1.insert({"id":1})
WriteResult({ "nInserted" : 1 })
single1:PRIMARY> db.t1.insert({"id":2})
WriteResult({ "nInserted" : 1 })
single1:PRIMARY> show dbs
admin 0.000GB
config 0.000GB
local 0.001GB
mongoshake 0.000GB
test1 0.000GB
test2 0.000GB
single1:PRIMARY> use test2
switched to db test2
single1:PRIMARY> db.t2.find()
{ "_id" : ObjectId("5e9dbe6e115d93488122f909"), "id" : 1 }
{ "_id" : ObjectId("5e9dbe70115d93488122f90a"), "id" : 2 }
#single3
single3:PRIMARY> use test1
switched to db test1
single3:PRIMARY> db.t1.find()
{ "_id" : ObjectId("5e9dbe203d06ebef5e549110"), "id" : 1 }
{ "_id" : ObjectId("5e9dbe243d06ebef5e549111"), "id" : 2 }
single3:PRIMARY> use test2
switched to db test2
single3:PRIMARY> db.t2.insert({"id":1})
WriteResult({ "nInserted" : 1 })
single3:PRIMARY> db.t2.insert({"id":2})
WriteResult({ "nInserted" : 1 })
3.2.2 容灾验证
#single1配置文件
conf.version = 2
id = mongoshake1
master_quorum = false
full_sync.http_port = 9101
incr_sync.http_port = 9102
system_profile_port = 9103
log.level = info
log.dir =/usr/local/mongo-shake/single1
log.file = collector.log
log.flush = false
sync_mode = all
mongo_urls = mongodb://root:970125@127.0.0.1:27017
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27019
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test1
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckptone
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#single2配置文件
conf.version = 2
id = mongoshake2
master_quorum = false
full_sync.http_port = 9201
incr_sync.http_port = 9202
system_profile_port = 9203
log.level = info
log.dir =/usr/local/mongo-shake/single2
log.file = collector.log
log.flush = false
sync_mode = all
mongo_urls = mongodb://root:970125@127.0.0.1:27018
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27019
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test2
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckpttwo
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#single1back配置文件
conf.version = 2
id = mongoshake1back
master_quorum = false
full_sync.http_port = 9301
incr_sync.http_port = 9302
system_profile_port = 9303
log.level = info
log.dir =/usr/local/mongo-shake/single1back
log.file = collector.log
log.flush = false
sync_mode = incr
mongo_urls = mongodb://root:970125@127.0.0.1:27019
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27017
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test1
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckptoneback
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#single2back配置文件
conf.version = 2
id = mongoshake2back
master_quorum = false
full_sync.http_port = 9401
incr_sync.http_port = 9402
system_profile_port = 9403
log.level = info
log.dir =/usr/local/mongo-shake/single2back
log.file = collector.log
log.flush = false
sync_mode = incr
mongo_urls = mongodb://root:970125@127.0.0.1:27019
mongo_cs_url =
mongo_s_url =
tunnel = direct
tunnel.address = mongodb://root:970125@127.0.0.1:27018
tunnel.message = raw
mongo_connect_mode = secondaryPreferred
filter.namespace.black =
filter.namespace.white =test2
filter.pass.special.db =
filter.ddl_enable = true
checkpoint.storage.url =
checkpoint.storage.db = mongoshake
checkpoint.storage.collection = ckpttwoback
checkpoint.start_position = 1970-01-01T00:00:00Z
transform.namespace =
full_sync.reader.collection_parallel = 6
full_sync.reader.write_document_parallel = 8
full_sync.reader.document_batch_size = 128
full_sync.collection_exist_no_drop = false
full_sync.create_index = background
full_sync.executor.insert_on_dup_update = true
full_sync.executor.filter.orphan_document = false
full_sync.executor.majority_enable = false
incr_sync.mongo_fetch_method = oplog
incr_sync.oplog.gids =
incr_sync.shard_key = auto
incr_sync.worker = 8
incr_sync.worker.oplog_compressor = none
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256
incr_sync.executor.upsert = false
incr_sync.executor.insert_on_dup_update = true
incr_sync.conflict_write_to = none
incr_sync.executor.majority_enable = false
#开启single1、single2到single3
#single1
single1:PRIMARY> use test1
switched to db test1
single1:PRIMARY> db.t1.insert({"id":1})
WriteResult({ "nInserted" : 1 })
single1:PRIMARY> db.t1.insert({"id":2})
WriteResult({ "nInserted" : 1 })
#single2
single2:PRIMARY> use test2
switched to db test2
single2:PRIMARY> db.t2.insert({"id":1})
WriteResult({ "nInserted" : 1 })
single2:PRIMARY> db.t2.insert({"id":2})
WriteResult({ "nInserted" : 1 })
#single3
single3:PRIMARY> use test1
switched to db test1
single3:PRIMARY> db.t1.find()
{ "_id" : ObjectId("5e9dc3fd3d06ebef5e549112"), "id" : 1 }
{ "_id" : ObjectId("5e9dc3ff3d06ebef5e549113"), "id" : 2 }
single3:PRIMARY> use test2
switched to db test2
single3:PRIMARY> db.t2.find()
{ "_id" : ObjectId("5e9dc43b77d631fe2074e4c3"), "id" : 1 }
{ "_id" : ObjectId("5e9dc43d77d631fe2074e4c4"), "id" : 2 }
#应用停写,停止同步任务,将应用切到目的端
#开启反向同步任务
[root@zijie mongo-shake]# ./start.sh mongoshake1back.conf
[root@zijie mongo-shake]# ./start.sh mongoshake2back.conf
#single3插入反向同步数据
single3:PRIMARY> use mongoshake
switched to db mongoshake
single3:PRIMARY> show tables;
ckptoneback
ckpttwoback
system.profile
single3:PRIMARY> use test1
switched to db test1
single3:PRIMARY> db.t1.insert({"id":3})
WriteResult({ "nInserted" : 1 })
single3:PRIMARY> db.t1.insert({"id":4})
WriteResult({ "nInserted" : 1 })
single3:PRIMARY> use test2
switched to db test2
single3:PRIMARY> db.t2.insert({"id":3})
WriteResult({ "nInserted" : 1 })
single3:PRIMARY> db.t2.insert({"id":4})
WriteResult({ "nInserted" : 1 })
#single1
single1:PRIMARY> db.t1.find()
{ "_id" : ObjectId("5e9dc3ff3d06ebef5e549113"), "id" : 2 }
{ "_id" : ObjectId("5e9dc3fd3d06ebef5e549112"), "id" : 1 }
{ "_id" : ObjectId("5e9dc6df115d93488122f90b"), "id" : 3 }
{ "_id" : ObjectId("5e9dc6e1115d93488122f90c"), "id" : 4 }
#single2
single2:PRIMARY> db.t2.find()
{ "_id" : ObjectId("5e9dc43d77d631fe2074e4c4"), "id" : 2 }
{ "_id" : ObjectId("5e9dc43b77d631fe2074e4c3"), "id" : 1 }
{ "_id" : ObjectId("5e9dc6e8115d93488122f90d"), "id" : 3 }
{ "_id" : ObjectId("5e9dc6ea115d93488122f90e"), "id" : 4 }
四、问题总结
- 权限
对于完全同步,MongoShake需要每个数据库的读取权限。对于增量,MongoShake需要local
数据库的读取权限和数据库的写入权限mongoshake
。 - 版本支持
MongoShake不支持3.0以下的MongoDB版本。 - MongoShake重启
sync.mode
是all
,重启后如果检查点存在且有效,这意味着最早的操作日志小于检查点,则MongoShake将仅运行增加同步。否则,MongoShake将再次运行完全同步,此后,将运行增加同步。因此,如果用户仍想运行完全同步但检查点存在,mongoshake.ckpt_default
则应手动删除检查点。 - MongoShake支持同步DDL
使用replayer.dml_only
选项。但是DDL不是幂等操作,一旦失败,oplog可能会重放,因此启用DDL在最新版本中可能会出现问题。 - 断点恢复
MongoShake支持基于检查点机制的断点恢复,每次启动时,它都会读取检查点,这是一个时间戳,指示已准备好重播多少数据。之后,它将从此时间戳开始从源中提取数据。因此重启时不会丢失数据。
- oplog一致性
mongoshake不支持oplog的严格一致性,当shard_key
是auto/collection
,mongoshake支持顺序一致性其中同一命名空间(装置ns
),该序列可以得到保证。如果shard_key
为id
,则mongoshake支持最终一致性。
- 监控
MongoShake提供了api(在配置中默认为9100http_profile
)来监控服务器方面的内部状态:
worker
:显示内部工作者状态,包括worker_id
,jobs_in_queue
,jobs_unack_buffer
,last_unack
,last_ack
,count
。sentinel
:显示前哨配置:(OplogDump
转储oplog日志,“ 0”表示无日志,“ 1”表示采样,“ 2”表示全部转储),DuplicatedDump
(如果启用则将重复的oplog写入日志),Pause
(如果出现以下情况,整个mongoshake同步将被暂停启用),TPS
(控制数据同步的速度)。repl
:显示总体状态:(logs_get
我们获得多少操作日志),logs_repl
(我们重播多少操作日志),logs_success
(我们成功重播多少操作日志),lsn
(最后发送),lsn_ack
(除0以外的所有工作队列中的最小ack值),lsn_ckpt
(检查点)now
,replset
,tag
,who
。conf
:显示配置。
可以使用curl
命令访问此端口。此外,提供了mongoshake-stat
脚本,通过以下实时方式通过静态API监控MongoShake。
./mongoshake-stat --port=9100
结果将每秒清除一次
logs_get
:一秒钟内我们获得了多少个oplog。
logs_repl
:我们在一秒钟内重播了多少oplog。
logs_success
:我们在一秒钟内成功重播了多少操作日志,即TPS。
- 双活
用户可以使用filter(filter.namespace.white
和filter.namespace.black
)来实现此功能。当前,过滤器的粒度为集合。
例如,我在一个名为mongodb的副本集中有三个数据库a
,b
而c
。假设源plicateSet为source-mongo
,目标副本集为target-mongo
,所以我们构建了两个MongoShakes 分别从source-mongo
和获取操作日志target-mongo
。在第一个MongoShake中,我们仅过滤数据库名称等于a
或b
在第二个MongoShake中我们仅过滤数据库名称c
。
用户可以使用自己的proxy
程序,数据分发,这样的写作a
,并b
会去到源数据库,同时c
转到目标。
- 数据校验
可以使用comparision.py
脚本进行验证,以从源数据库和目标数据库中获取并比较数据。但是请注意,它仅比较大纲信息,例如数据库编号,收集编号,文档编号,两边都存在相同的“ _id”。因此,如果{"_id":1, "a":1}
源数据库中有一个条目{"_id":1, "a":2}
,而目标数据库中有另一个条目,则此比较代码无法验证。
- 系统库同步
出于某些特殊原因,将数据写入“ admin”数据库,但是“ admin”数据库无法同步,如何将源MongoDB上“ admin”下的集合同步到目标MongoDB上的另一个集,filter.pass.special.db
以便“管理员”数据库也可以同步,但是用户应非常小心。这是我们的建议,假设用户要将“ admin.source”同步到“ users.target”:
- 设置
filter.pass.special.db = admin
为让“管理员”数据库通过。 - 设置
filter.namespace.white = admin.source
为在其他名称空间被过滤时让“ admin.source”通过。对于当前的v2.0.7版本,“用户”也应添加到白名单中。 - 设置
transform.namespace = admin.source:users.target
为将源中的“ admin.source”传递到目标上的“ users.target”。
参考文档:https://yq.aliyun.com/articles/603329、https://yq.aliyun.com/articles/719704