查看精彩回放:https://developer.aliyun.com/live/45078
内容简要
一、背景
二、MongoShake基本原理
三、经典案例
一、背景
(一)MongoDB基本概念
MongoDB基本概念主要有三个:Oplog、ReplicaSet和Replication。
- Oplog
Oplog是用于存放MongoDB增量数据,类似于MySQL的Binlog,每一个写操作都会产生对应的Oplog,例如:
上方就是Oplog的基本格式。
- ReplicaSet
副本集形态,包括Primary、Secondary、Hidden等角色,Secondary从Primary复制。
例如一主一从还有一个Hidden,默认情况下,Secondary都是从Primary进行复制的,表示数据是从Primary拉取。节点之间会维持一个心跳,如果Primary挂掉的话,Secondary会拉起重新选举成为主节点,即成为一个Primary,然后提供服务。
通常来说Primary可以提供读写服务,Secondary提供读服务。
- Replication
复制:包括全量数据的复制和增量数据的复制。
- 全量复制:扫描Primary表,写入Secondary。
- 增量复制:基于Oplog拉取并写入Secondary。
(二)副本集容灾
当ReplicaSet发生单节点故障,由于容灾机制,工作会正常进行。
例如,如果Primary节点Down掉的话,会有一个Secondary重新选举,然后成为一个新的Primary节点,保证服务正常提供。同理,如果Secondary或Secondary(Hidden)节点挂的话也不影响运行。无论三个节点单独挂掉哪一个单独节点,都不影响业务正常运行。
但如果ReplicaSet挂掉超过半数的节点,则无法继续提供服务。
通常来说,数据库部署在同一个机房中,机房如果发生一些非常规情况,如断电,着火,光纤挖断,甚至地震等小概率事件,则存在较高风险。因此,异地容灾十分有必要,如果某地机房无法使用后,可以整体快速切换到异地备灾机房,保证业务稳定开展。
二、MongoShake基本原理
(一)全量与增量衔接
如上图所示,例如用户在8点启动了MongoShake,此时会进行全量同步。假设用户在9点时结束了全量同步,那么用户的增量同步从9点开始的,但用户同步的Oplog还是会从8点开始拉取。
这是由于拉取的数据对于MoangoDB来说不是SnapShot的镜像数据,期间数据可能会变动,因此Oplog需要从8点开始拉取,从而实现数据的衔接。
(二)数据流图
上方为数据流内部架构图。
方框内全量同步直接把全量数据dump下载下来,Fetcher推给Write进程进行编写。写的时候会把多条数据Batch到一起写,从而增大写入的吞吐,写前或者写后都可以进行建索引(Build Indexes)的操作。
增量过程直接tail Oplog不断拉取数据,拉取后经过Fetcher、Bather、Worker等几个流,最后Replayer写到MoangoDB中。此外,用户不止可以写入MoangoDB,也可以写入Kafka、Tcp、Rpc等模式,也提供用户自定义模式。
外部流程的同步工具有监控可以监控同步状况,HA用来进行实时切换,流量监控,全量校验,以及断点续传管理器可以解决增量同步时发生断点的情况。
(三)全量拉取
全量同步原理是多线程并发拉取并写入。
首先按表并发拉取,用户可以一次性拉取多个表,表内的多个文档会聚合成一个Batch然后写入队列。多个Write线程会从队列里进行并发拉取,然后写入到目的库,全量拉取之前和之后都可以建立一个索引。
(四)增量同步
相比全量同步,增量同步细节点更多,因此实现过程会复杂一些。
首先Fetcher负责从源端拉取Oplog,上方图中的数字代表Oplog序号。数据取完后会推给不同的Decoder进行解析,解析的过程中需要对数据进行保序,防止数据错乱。数据解析完成之后传给Batcher按序进行聚合。
Batcher完成后,用户可以选择用id或者ns方式进行Hash,之后发给多个Worker线程,之后数据可以根据需求进行校验、压缩等操作,然后写入不同通道。Replayer负责拉取之前写入的不同数据,然后进行对称操作,之后经过batch write写入目的库。
(五)哈希原理
上方提到,从Batcher到Worker有id和ns两种Hash方式,两种方式都有各自的优缺点,默认情况是按照ns方式。
1.按ns方式进行Hash
Hash by namespace: db.collection
crc32(ns) % n
优点
1)同一个表内部保证有序;
2)表数目多的情况下,能保证较好的并发控制。
缺点
单表倾斜,同步性能将会退化。
2.按id方式进行Hash
Hash by document primary id in Oplog._id
_id % n
优点
1) 所有worker负载比较均衡;
2) 不会存在大表倾斜的问题;
3) 同一个_id文档操作保证有序。
缺点
1) 不同_id文档没办法保证有序,存在幻读;
2) 存在唯一索引情况下,没办法并发。
关于缺点的第2点,下面举一个例子。
如上图所示,用户在a上面建立了唯一索引。此时如果按照_id进行并发,由于个a:1的_id可能不一样,在并发过程中,后方的a:1可能会跑到前方的a:1前面,最后输出的数据只有一条。
v.2.4.12版本优化:部分表按_id哈希,部分表按namespace哈希。
(六)DDL处理
当用户建库、建索引、删表,索引无法跟原来的文档同步,完全地进行并发。
例如,先建表然后插数据,和先插数据再建表,最后产生的结果完全不一样。
如上图所示,当出现C,表示command操作,在MangoDB里表示DDL操作,此时我们需要在C后面添加全局barrier屏障,保证前面的数据同步完成后,撤掉C后面的全局barrier屏障,在C前面添加全局barrier屏障,然后同步后面的数据。这样可以在保证正确性的前提下,提高并发。
(七)Checkpoint原理
Checkpoint
1) 记录同步的位点,用于挂掉重启后,从上次记录的位点重新开始同步;
2) 对于DML是幂等的,重复回放不会有问题;
3) 对于DDL,每次同步将会强制刷新Checkpoint;
4) 为了保证性能,定期存储Checkpoint。
Checkpoint是基于LSN实现的,如上图所示,一个方框就代表一条Oplog。
LSN
1) LSN:Oplog的序列号,由ts时间戳字段解析,同一个副本集内保证唯一;
2) LSN_ACK:标记已经成功写入的最大的LSN;
3) LSN_UNACK:标记已经tunnel中传输成功,但是还没有写入MongoDB;
- 有些时候,比如对于非direct通道,需要重传从LSN_ACK到LSN_UNACK之间的Oplog
4) LSN_CKPT标记Checkpoint,默认持久化在源库MongoDB的mongoshake.ckpt_default表内; - 一旦发生重启,LSN_CKPT开始的数据将全部重传
5) 默认约束:LSN_UNACK >= LSN_ACK >= LSN_CKPT
(八)集群版架构—同步模式
集群版有多个Shard并行,对于同步有两种模式,一种是Oplog模式,需要需要关闭Balancer。另一种是Change Stream模式,无需关闭Balancer。
- Oplog模式同步(需要关闭Balancer)
Oplog模式启动了多个线程,每个线程对应一个Shard进行拉取,它的一个约束条件是需要关闭Balancer。
- Change Stream模式同步(无需关闭Balancer)
Change Stream模式直接从MangoS上拉取数据,无需关闭Balancer,建议集群版用户在增量同步时使用。
集群版开启Balancer下MangoShake同步存在什么问题?以下举例说明。
如上图所示,有Shard1和Shard2,Shard1上update一条数据,之后通过Move Chunk挪到Shard2上面。后来在Shard2上update一条数据,此时a=3。
预期的结果是先Shard1,再Shard2,最后得到a=3。
如果不关闭Balancer,则实际的结果可能是先执行Shard2上的oplog,再执行Shard1上,最后就得到a=1,破坏了一致性。
(九)解决集群版Move Chunk的问题
为什么Change Stream拉取可以不关闭Balancer?
Change Stream是MongoDB在3.6版本后推出的特性,如上图所示,对于这种场景,它能解决顺序先后性问题。如Shard1上有3个Chunk,里面可能有很多Oplog,Shard2和Shard3上同理,它是解决这种解决就是对于这种场景,当前这个场景它是能够解决顺序的先后性的问题。
Change Stream对3个Shard会建立3个对应的Cursor,Cursor在对应的Shard里进行数据拉取。如果Shard1的Chunk1移动到Shard2的Chunk1,在并发拉取的时候,Change Stream对Move Chunk可以进行保序。
(十)延迟同步
另外MongoShake支持延迟同步,例如加了一个时延控制器,设置Oplog3延迟1小时,则需要等待1小时后才能进行。
(十一)多Tunnel对接
如上图所示,MongoShake拉取后可以写到多个Tunnel中。
目前大多数用户使用的是Direct Tunnel,拉取后直接写到目的端。部分用户要求通过Receiver进行对接,如需要写RPC、TCP等模式,目前对接过程较为繁琐。Collector直接写入Kafka,用户可以直接从中进行下一轮消费,每条Kafka数据都是单条json格式。
(十二)监控 – 增量同步如何知道目前瓶颈
如何监控MongoShake?下面以监控增量同步瓶颈为例。
如上图所示,Syncer、Worker、Executor为内部多个线程,线程写到队列中,用户可以通过内部队列判断同步的瓶颈。
如果用户发现队列1经常处于未满状态,后面的队列都满了,则瓶颈为拉取。如果队列2满了,则说明Syncer到Worker之间的数据解析存在一定瓶颈。如果队列3满了,则表示内部Worker到Executor的过程。队列4满的话,则表示写入端会成为瓶颈,通常瓶颈会发生在队列4这个地方。
具体监控指标可以查看github wiki监控文档。
三、应用场景
(一)功能概述
数据同步
1) 容灾:同城容灾、异地容灾
2) 迁移:跨版本、跨形态、跨云、跨地域
3) 多活:数据多向复制,各地可写
4) 分析:基于数据的实时分析、监控、告警等
5) 异构:MoangoDB数据转换为其他的数据库形态
6) 混合云:构建混合云平台
(二)案例
1.高德地图
高德地图在全国部署了华北、华东和华南三个机房,三地数据库搭建了6向MongoShake的复制链路,每一个地方都是可写的。比如写到华北的数据会同步到华东和华南,写到华东会同步到华北和华南,同理,写到华南会同步到华东和华北,中间链路采用MongoShake进行实现。
下面介绍如果发生容灾场景是怎么做的。
如上图所示,两个机房通过MongoShake双向同步,上面有个北向路由层,它决定哪条数据会发送到哪个机房进行路由的哈希。
假设左边机房发生宕机或者不可服务,路由的链路可以直接把发到左边机房的流量切到右边机房,从而实现数据容灾。
2.阿里云BLS服务
上面是阿里云云上之前售卖的BLS服务,是根据MongoShake搭建而成,用户自己可以购买两个数据库,然后购买 BLS的服务,然后直接可以帮用户搭建出一套双向的来回链路。
实现的原理是对于每一个集群都有一个Collector的进程,负责进行拉取,同时每个Collector有一个容灾的备进程。数据拉取完写进Kafka通道中,Receiver从Kafka拉取数据后写入目的端。中间有个中控Manager,会对服务进行定期保活、HA机制、监控等。
3.某电商客户数据分析
上图为某个电商客户根据MongoShake基于分析数据场景搭建的单向同步链路。用户在美国部署一个MoangoDB的集群提供读写服务,用户把MoangoDB的数据拉取到国内,然后进行读服务与数据分析。
4.某游戏客户容灾场景
如上图所示,某位游戏客户用户在两个机房里面搭建应一套数据库和业务,整套业务和数据库都是在同一个数据中心里,然后数据的分发也是根据北向路由层进行访问与切流。
在数据同步的时候,如果左边的机房挂掉,流量可以直接通过SLB切到右边的机房,实现容灾效果。
5.某游戏客户延迟同步,实时回滚场景
上图是某游戏客户用MongoShake做延迟同步的场景。
正常情况下,用户请求写到源库里面中,MongoShake可以构建延迟同步,延迟一个小时或两个小时。如果左边的源库发生宕机情况,用户可以把流量直接切到延迟库上,把数据回滚到延迟之前,相当于秒级RTO回滚数据。
6.全球级联同步场景
上图是某全球客户,通过MongoShake搭建全球级联场景,从新加坡到北京,从北京到孟买,然后再从孟买到法兰克福,实现全球级联场景。
7.监控分析场景
上方为监控分析的场景,用户采用MongoShake把数据从数据库拉取出来,然后写到Kafka,用Receiver读取完后推到下游的一些监控平台或者分析平台。