MongoShake最佳实践

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
简介: mongoshake最佳实践,到底该怎么玩?

我们在去年开源了MongoShake,可以用于MongoDB的数据同步,满足用户多种需求,整体介绍文档参考这里。今天,我来主要介绍一下MongoShake的最佳实践,主要涵盖以下几部分:

  1. 从MongoDB副本集同步到MongoDB副本集
  2. 从MongoDB副本集同步到MongoDB集群版
  3. 从MongoDB集群版同步到MongoDB集群版
  4. 从MongoDB副本集同步到kafka通道
  5. 云上MongoDB副本集的双向同步
  6. 配置文件参数解释
  7. 其他问题Q&A说明。这部分我主要列了目前用户存在的主流问题,这些问题在github上的wiki上都已经有说明,不过是英文的:https://github.com/alibaba/MongoShake/wiki/FAQ

MongoShake的下载路径我们提供在了github上,点击可以下载不同版本的mongoshake:mongo-shake-x.y.z.tar.gz。目前MongoShake提供了配置文件启动的方式,启动的命令行:./collector.linux -conf=collector.conf,不同的平台请选择不同的二进制文件,如windows下是collector.windows。

0. 配置文件

所有的参数都列在了配置文件里面,目前配置文件较多,用户可能比较困惑,正常情况下用户默认就行,修改的参数只有少部分。
MongoShake支持MongoDB副本集和集群版的互相同步,还可以同步到kafka模式,下面列出了几种常见的同步模式。

1. 从MongoDB副本集同步到MongoDB副本集

假设源端是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端也是三副本:10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007。同步模式是全量+增量同步。
则用户需要修改以下几个参数:

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = all # all 表示全量+增量,full表示仅全量,incr表示仅增量
incr_sync.tunnel.address = mongodb://username:password@10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007 #目的端连接串信息,逗号分隔不同的mongod
incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。

2. 从MongoDB副本集同步到MongoDB集群版

假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = all # all 表示全量+增量,document表示仅全量,oplog表示仅增量
incr_sync.tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。也可以只配置部分,配置多个mongos可以做负载均衡写入。
incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。

3. 从MongoDB集群版同步到MongoDB集群版

假设源是2节点:节点1是10.1.1.1:1001, 10.1.1.2:2002, 10.1.1.3:3003;节点2是10.2.2.1:1001, 10.2.2.2:2002, 10.2.2.3:3003,mongos:10.1.1.10:1010。目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。

mongo_urls = mongodb://username1:password1@10.1.1.1:1001,10.1.1.2:2002,10.1.1.3:3003;mongodb://username2:password2@10.2.2.1:1001,10.2.2.2:2002,10.2.2.3:3003 #源端连接串信息,逗号分隔同一个shard不同的mongod,分号分隔不同的shard。
mongo_cs_url = mongodb://username1:password1@10.5.5.5:5555,10.5.5.6:5556 # 如果源端是sharding,此处需要配置源端sharding的cs的地址
mongo_s_url = mongodb://username_s:password_s@10.1.1.10:1010 # 如果希望采用change stream拉取,则还需要配置mongos的地址,一个就够了
sync_mode = all # all 表示全量+增量,document表示仅全量,oplog表示仅增量
incr_sync.tunnel.address = mongodb://username:password@20.1.1.1:2021;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。
incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。

4. 从MongoDB副本集同步到kafka通道

假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的kafka是50.1.1.1:6379,topic是test。

mongo_urls = mongodb://username:password@10.1.1.1:1001,10.2.2.2:2002,10.3.3.3:3003 #源端连接串信息,逗号分隔不同的mongod
sync_mode = oplog # 如果目的端不是mongodb,仅支持增量同步模式
incr_sync.tunnel = kafka
incr_sync.tunnel.address = test@50.1.1.1:6379
incr_sync.mongo_fetch_method = oplog # 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。

5. 云上MongoDB副本集的双向同步

云上副本集的双向同步可以参考副本集的单向同步,但是需要注意的有以下几点,假设A和B之间双向同步:

  1. 需要搭建2个mongoshake,一个从A到B,另一个从B到A
  2. 两条mongoshake不能同时用全量+增量(all)模式,正常应该是一个库为空(假设B),另一个有数据。那么从A到B先发起一次全量+增量同步,等待全量同步完毕以后,再从B到A发起一次增量同步。
  3. 双向同步需要依赖gid的开启,这个可以联系烛昭(通过售后联系),开启gid将会重启实例造成秒级别闪断。
  4. gid用于记录数据的产生地,比如从A产生的数据导入到B以后,不会被再导入回A,这样就不会产生环形复制。需要注意的是,这个gid只能用于增量,这也是第2条为什么一个方向通道是全量+增量,另一个方向通道需要搭建增量的原因。
  5. 云下开源的mongodb不能使用双向同步,因为gid的修改是在内核里面,所以开源不支持。
  6. sharding同样也支持双向同步

6. 部分配置文件参数解释

v2.4开始的参数请参考github wiki配置参数说明,下面是2.2及之前的参数。具体请查看配置文件的注释,此处只做简单解释

  • mongo_urls: 源mongodb的连接地址
  • mongo_connect_mode: 源端连接的模式,有几种模式可选:从seconary拉取;从primary拉取;secondary优先拉取;单节点拉取
  • sync_mode: sync模式,有几种模式可选:全量,增量,全量+增量
  • http_profile: 提供restful接口,用户可以查看一些内部运行情况,也可以对接监控。
  • system_profile: profile端口,可以查看进程运行的堆栈情况。
  • log: log日志相关参数。
  • filter.namespace.black: 黑名单过滤。黑名单内的库表不会被同步,剩下的同步。
  • filter.namespace.white: 白名单过滤。白名单内的库表会被同步,剩下的过滤掉。黑白名单最多只能配置一个,不配置会同步所有库表。
  • filter.pass.special.db: 有些特别的库表会被过滤,如admin,local, config库,如果一定要开启,可以在这里进行配置。
  • oplog.gids: 用于云上双向同步。
  • shard_key: 内部对数据多线程的哈希方式,默认collection表示按表级别进行哈希。
  • worker: 增量阶段并发写入的线程数,如果增量阶段性能不够,可以提高这个配置。
  • worker内部相关配置: worker.batch_queue_size, adaptive.batching_max_size, fetcher.buffer_capacity, 关于内部队列的相关配置,具体请参考github wiki文档。
  • worker.oplog_compressor: 压缩模式,如果是非direct模式开启这个可以减少网络传输的开销。
  • tunnel.address: 目的端对接的地址。
  • context.storage: checkpoint存储的位置,database表示把数据存入MongoDB,api表示把数据存入用户自己提供的http接口。
  • context.storage.url: checkpoint写入到哪个MongoDB,如果源是sharding,此处配置cs地址,checkpoint会写入admin库;如果是副本集,不配置,会默认写入源库,配置则写入配置的库里面。
  • context.address: checkpoint写入的表的名字。
  • context.start_position: checkpoint启动开始拉取的增量时间位点。如果本身checkpoint已经存在(参考上述context的位置),那么则按照context信息进行拉取,如果不存在,则按照这个位点进行增量拉取。
  • master_quorum: 如果以主备模式拉取同一个源,则这个参数需要启用。
  • transform.namespace: 命名空间的转换,a.b:c.d表示把源端a库下面的c表同步到目的端c库下面的d表。
  • replayer.dml_only: 默认不同步DDL,false表示同步DDL。DDL包括建表,删库,建索引等语句。
  • replayer.executor.upsert: 目的端如果update语句对应的主键id不存在,是否将update语句更改为insert语句。
  • replayer.executor.insert_on_dup_update: 目的端如果insert语句对应的主键id已经存在,是否将insert语句更改为update语句。
  • replayer.conflict_write_to: 对于写入冲突的情况,是否需要记录冲突的文档。
  • replayer.durable: 测试选项,false表示取消写入,只用于拉取调试。
  • replayer.collection_parallel: 全量同步按表并发的并发度。
  • replayer.document_parallel: 全量同步同一个表内并发写入的线程数。
  • replayer.document_batch_size: 全量同步一次性batch的大小。
  • replayer.collection_drop: 如果目的库表存在,是否先删除目的库再进行同步。

7. Q&A说明

此处记载几个常见的问题:

Q. mongoshake是否会同步config, local, admin库

A: 不会同步。如果用户一定要把admin的库同步到别的,那么可以通过命名转换功能(配置transform.namespace)把admin库同步到别的数据库,同时配置filter.pass.special.db参数:

  1. filter.pass.special.db = admin
  2. transform.namespace = admin.abc:target.abc # 把admin库下面的abc同步到target库的abc

Q. 从MongoDB同步到MongoDB发现全量阶段目的端MongoDB压力过大怎么办?

A: 用户可以降低全量同步配置replayer的相关参数,以降低目的端压力。

Q. 从MongoDB同步到MongoDB发现全量阶段同步时间过久,怎么办?

A: 用户可以定位一下看看目的慢的原因是什么,正常情况下可以提高全量配置replayer的相关参数,以提高全量同步的性能。但也有可能,问题的瓶颈是源端/网络端/通道cpu,内存等压力过高,都是有可能的。

Q. 到底什么是checkpoint?

A: checkpoint是记录增量同步的位点情况,mongoshake的启动就是根据这个位点来进行的,比如我们已经同步到10点钟了,那么这个时候挂掉了,如果没有位点,那么只能从头开始拉,位点的意义就是在于断点续传。在mongoshake中,位点信息是以64位时间戳来标记的(ts字段,准确的说是32位时间戳+计数),如果发生挂掉重启,那么会从源库的oplog表中寻找这个位点,并从此开始往后进行同步。
到这里,用户可能会问,那么mongoshake里面位点是存储在哪里的?存储的位置就是取决于用户context的相关配置,以副本集为例,checkpoint是存储于源库的mongoshake库的ckpt_default表,其大概内容长这个样子:

rszz-4.0-2:PRIMARY> use mongoshake
switched to db mongoshake
rszz-4.0-2:PRIMARY> show collections
ckpt_default
rszz-4.0-2:PRIMARY> db.ckpt_default.find()
{ "_id" : ObjectId("3d75e8b872d91278c3be0cc9"), "name" : "rszz-4.0-2", "ckpt" : Timestamp(1566556865, 196096) }

其中ckpt对应的field就是checkpoint。
用户可能还会问,那么mongoshake的checkpoint是怎么存储的,什么时候会存储?答案是,checkpoint是定期存储的,如果有数据同步,那么checkpoint的更新会很频繁(秒级别);如果没有数据同步,比如这个时候用户源端就是没有写入,那么默认是分钟级别更新一次心跳checkpoint。假设mongoshake数据位点记录是10:00:00,但是数据已经同步到10:00:10,这个时候mongoshake挂掉了,下次会从10:00:00重新开始同步,由于DML本身具有幂等性,数据的重复回放是OK的。那么用户可能会问,DDL怎么办?参考下面问题。

Q. 如何同步DDL?DDL能保证幂等性吗?

A: 设置replayer.dml_only = false可以同步DDL。DDL不能保证幂等性。在mongoshake里面,如果发现是DDL语句,会卡一个全局的barrier,让当前DDL语句同步,然后再卡一个全局barrier,等当前DDL语句同步完成,再接着并发同步接下来的oplog,barrier的卡锁伴随着checkpoint的强制刷新。但是由于DDL同步和checkpoint的刷新并不是一个院子操作,如果用户恰好在同步DDL完成和checkpoint刷新之间,进程挂掉了,那么没有办法,重启以后肯定会持续报错,用户需要手动运维解决,比如跳过这个DDL,或者目的端近些一个反向操作(原来DDL是建库,需要进行删除库;原来是建索引,需要进行删索引操作)。但是,这个概率很低。

Q. mongoshake碰到同步出错的情况,会跳过这个错误,继续同步吗?

A: 不会,会持续报错,用户需要关注日志的运行情况。

Q. 除了日志如何知道目前mongoshake的同步情况?

A: mongoshake有提供restful接口,可以监控mongoshake的内部同步情况,参考wiki: How to monitor the MongoShake,正常情况下lsn_ckpt.time会一直增长,如果这个数值不增长了,那么就表示同步出现问题了。

Q. 全量同步是否会同步索引?

A: 2.1以下版本,目前会同步索引,先是全量同步,全量同步完毕后会同步索引,后面版本我们会考虑加入开关,全量同步完毕可以选择性是否同步索引。接着是增量同步,增量同步阶段,设置replayer.dml_only = true不会同步索引,false的话会同步索引。

Q. 选择all模式同步,如果挂掉以后重启,是否会重新发起一次全量同步?进入增量阶段,是否要修改为oplog增量模式,然后再重启?

A: 显示全量同步,全量同步完毕会写入checkpoint,然后进行增量同步,增量同步阶段会不断更新checkpoint。对于mongoshake来说,是否进行全量同步取决于checkpoint,如果checkpoint存在且合法,那么只会进入增量同步;否则,会先进行全量同步,然后是增量。那么checkpoint怎么知道是存在的,就是context里面的参数的位置决定;那么怎么知道checkpoint是合法的?如果最老的oplog的时间位点小于checkpoint,那么就是合法的,证明增量能接上。

Q. 我的数据量很大,几百G/几个T/几十个T,all模式同步有什么要注意的地方?

A: 最需要注意的就是,全量同步完毕,增量能否接上。mongoshake是根据oplog的位点来决定全量结束是否要进行增量,如果全量花了2天,但是oplog表只能存储1天,那么全量同步完毕以后mongoshake就会报错退出,这是因为增量的位点丢失了。所以如果全量很大,同步时间势必很久,那么就需要放大oplog的表的大小以维持增量oplog。
其他还需要注意的就是压力问题,如何选取适当的参数应对对于源端/目的端的压力,减少对业务的影响。用户也可以决定从一个hidden结点,护着一个独立的secondary结点进行拉取。

Q. 同步出现xxx错误,怎么办?

A: 这个我在github的wiki基本都有总结,可以先看下github的faq。如果里面没有提到的,可以提交issue进行反馈。

Q. 目前mongoshake有校验工具吗?

A: 目前script下面有个comparison.py的全量校验脚本校验数据的一致性。那么如何进行增量的校验呢?目前还没有,敬请期待。

Q. sharding模式同步有啥需要注意的吗?

A: 如果源端是sharding,那么目前需要关闭balancer,在v2.1非稳定版本里面,我们支持了不关闭balancer,用户可以进行试用,但是目前这个是非稳定版本。此外源端如果是sharding,DDL的同步(replayer.dml_only = false)不能开启。

Q. 我把数据写入到kafka,怎么从kafka中拉取数据,发现直接拉取出现乱码

A: 请先看下wiki: https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct
kafka通道中的数据是有控制信息在里面的,用户不好直接剥离,可以用receiver进行对接。目前receiver对接后所有oplog都打印到控制台,用户也可以自己修改receiver代码,对接下游服务。当然,这个需要用户懂golang语言,这可能有一些学习成本在里面。后面我们会看看如何以更好的形式对接给用户。

Q. 我发现mongoshake建立以后对源端性能有所影响,出现了一些慢查询。

A: 目前全量阶段由于是扫表等操作,会对源端产生一些影响。增量阶段,正常情况影响很小,因为目前oplog找对应ts是二分模式。

Q. 是否可以把一个MongoDB同步到多个MongoDB?

A: 可以,需要启动多个mongoshake。对于不同的mongoshake,可以修改写入不同地方的checkpoint即可(比如表名修改一下),或者直接修改不同的collector.id。此外,需要注意的是,同一个目录下启动多个mongoshake,如果collector.id相同,是无法启动成功的。参考:https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-sync-data-from-one-mongodb-to-several-target-mongodb

目录
相关文章
|
6月前
|
存储 关系型数据库 MySQL
DataX: 阿里开源的又一款高效数据同步工具
DataX 是由阿里巴巴集团开源的一款大数据同步工具,旨在解决不同数据存储之间的数据迁移、同步和实时交换的问题。它支持多种数据源和数据存储系统,包括关系型数据库、NoSQL 数据库、Hadoop 等。 DataX 提供了丰富的数据读写插件,可以轻松地将数据从一个数据源抽取出来,并将其加载到另一个数据存储中。它还提供了灵活的配置选项和高度可扩展的架构,以适应各种复杂的数据同步需求。
|
17天前
|
缓存 监控 大数据
构建高可用AnalyticDB集群:最佳实践
【10月更文挑战第25天】在大数据时代,数据仓库和分析平台的高可用性变得尤为重要。作为阿里巴巴推出的一款完全托管的PB级实时数据仓库服务,AnalyticDB(ADB)凭借其高性能、易扩展和高可用的特点,成为众多企业的首选。本文将从我个人的角度出发,分享如何构建和维护高可用性的AnalyticDB集群,确保系统在各种情况下都能稳定运行。
25 0
|
6月前
|
关系型数据库 分布式数据库 PolarDB
开源PolarDB-X 部署安装全过程
本文介绍了开源PolarDB-X的部署安装步骤:首先,下载并解压PXD工具,配置至系统路径;然后,使用PXD工具进行部署,选择模式,输入参数,并等待部署完成;最后,验证集群状态。在遇到问题时,参考官方文档、社区支持或进行排查。建议包括优化文档、增强错误处理和建立用户反馈机制。
|
3月前
|
SQL 关系型数据库 MySQL
使用OceanBase进行大规模数据迁移的最佳实践
【8月更文第31天】随着业务的不断扩展,数据迁移成为了企业日常运营中不可避免的任务之一。对于那些正在从传统的数据库系统向分布式数据库系统过渡的企业来说,数据迁移尤为重要。OceanBase 是一个由阿里巴巴集团开发的高性能分布式关系数据库,它以其高可用性、水平扩展能力和成本效益而闻名。本文将探讨如何使用 OceanBase 进行大规模数据迁移,并提供相关的最佳实践和代码示例。
308 1
|
5月前
|
存储 监控 数据安全/隐私保护
数据迁移至云:最佳实践与工具
【6月更文挑战第1天】企业在数字化转型中选择将数据迁移至云以获取灵活性、降低成本及增强安全性。迁移前需详细规划,评估目标和需求,选择合适的云服务商。确保数据备份,利用工具如 AWS DataSync 自动化迁移,注意数据格式兼容性,并在迁移中监控、测试数据完整性。保障安全性,设置访问权限和加密。迁移后优化管理云资源,实现最佳性能和成本效益。遵循最佳实践,确保数据迁移顺利。
85 1
|
6月前
|
存储 关系型数据库 分布式数据库
【PolarDB开源】PolarDB高可用架构解析:确保业务连续性的关键设计
【5月更文挑战第22天】阿里云PolarDB是一款高可用、高性能的云原生数据库,采用分布式共享存储架构实现计算与存储分离。通过主从复制保证数据实时同步,当主节点故障时,从节点能快速接管。此外,PolarDB提供自动故障转移和数据备份恢复功能,确保业务连续性和数据安全性。一个简单的Python SDK使用示例展示了查询数据的过程。总之,PolarDB通过多种机制保障了企业在异常情况下的服务稳定和数据完整性。
268 5
|
6月前
|
Cloud Native 关系型数据库 OLAP
高效易用的数据同步:阿里云瑶池 Zero-ETL服务来啦!
在大数据时代,企业有着大量分散在不同系统和平台上的业务数据。OLTP数据库不擅长复杂数据查询,不具备全局分析视角等能力,而OLAP数据仓库擅长多表join,可实现多源汇集,因此需要将TP数据库的数据同步到AP数据仓库进行分析处理。传统的ETL流程面临资源成本高、系统复杂度增加、数据实时性降低等挑战。为了解决这些问题,阿里云瑶池数据库提供了Zero-ETL服务,可以快速构建业务系统(OLTP)和数据仓库(OLAP)之间的数据同步链路,将业务系统的数据自动进行提取并加载到数据仓库,从而一站式完成数据同步和管理,实现事务处理和数据分析一体化,帮助客户专注于数据分析业务。
631 0
|
6月前
|
canal 消息中间件 关系型数据库
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
266 0
|
数据采集 Cloud Native 关系型数据库
实现业务零停机!NineData的PostgreSQL数据迁移能力解析
NineData推出了PostgreSQL业务不停服数据迁移能力。NineData实现了完全自动化的结构迁移和全量数据迁移,并提供了变更数据的迁移能力。这种能力可以实时监听源PostgreSQL中的变更数据,在完成全量迁移后将变更数据实时复制到目标PostgreSQL,实现源/目标PostgreSQL的动态复制。在PostgreSQL数据迁移过程中,业务可以正常提供服务,无需停服。最终,业务可以根据需求选择对应的时间点切换到目标PostgreSQL。
606 1
|
6月前
|
存储 监控 负载均衡
TiDB数据迁移工具TiCDC:高效同步的引擎
【2月更文挑战第28天】TiCDC是TiDB生态中一款强大的数据迁移工具,它专注于实现TiDB增量数据的实时同步。通过解析上游TiKV的数据变更日志,TiCDC能够将有序的行级变更数据输出到下游系统,确保数据的实时性和一致性。本文将深入探讨TiCDC的原理、架构、应用场景以及使用方式,帮助读者更好地理解和应用这一工具,实现高效的数据迁移和同步。