走进MongoShake

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 阿里云数据库NoSQL团队技术专家烛昭在阿里云开发者社区特别栏目《周二开源日》直播中,介绍了MongoShake的基本原理,并结合典型的应用案例介绍MongoShake的应用场景。本文为直播内容文字整理,看直播回放,请点击文首链接~

查看精彩回放:https://developer.aliyun.com/live/45078

内容简要

一、背景

二、MongoShake基本原理

三、经典案例


一、背景

(一)MongoDB基本概念

MongoDB基本概念主要有三个:Oplog、ReplicaSet和Replication。


  1. Oplog
    Oplog是用于存放MongoDB增量数据,类似于MySQL的Binlog,每一个写操作都会产生对应的Oplog,例如:

1.png


上方就是Oplog的基本格式。


  1. ReplicaSet
    副本集形态,包括Primary、Secondary、Hidden等角色,Secondary从Primary复制。

1.png


例如一主一从还有一个Hidden,默认情况下,Secondary都是从Primary进行复制的,表示数据是从Primary拉取。节点之间会维持一个心跳,如果Primary挂掉的话,Secondary会拉起重新选举成为主节点,即成为一个Primary,然后提供服务。

通常来说Primary可以提供读写服务,Secondary提供读服务。


  1. Replication
    复制:包括全量数据的复制和增量数据的复制。


  • 全量复制:扫描Primary表,写入Secondary。
  • 增量复制:基于Oplog拉取并写入Secondary。


(二)副本集容灾

1.png

当ReplicaSet发生单节点故障,由于容灾机制,工作会正常进行。


例如,如果Primary节点Down掉的话,会有一个Secondary重新选举,然后成为一个新的Primary节点,保证服务正常提供。同理,如果Secondary或Secondary(Hidden)节点挂的话也不影响运行。无论三个节点单独挂掉哪一个单独节点,都不影响业务正常运行。


但如果ReplicaSet挂掉超过半数的节点,则无法继续提供服务。

1.png

通常来说,数据库部署在同一个机房中,机房如果发生一些非常规情况,如断电,着火,光纤挖断,甚至地震等小概率事件,则存在较高风险。因此,异地容灾十分有必要,如果某地机房无法使用后,可以整体快速切换到异地备灾机房,保证业务稳定开展。


二、MongoShake基本原理

(一)全量与增量衔接

1.png

如上图所示,例如用户在8点启动了MongoShake,此时会进行全量同步。假设用户在9点时结束了全量同步,那么用户的增量同步从9点开始的,但用户同步的Oplog还是会从8点开始拉取。


这是由于拉取的数据对于MoangoDB来说不是SnapShot的镜像数据,期间数据可能会变动,因此Oplog需要从8点开始拉取,从而实现数据的衔接。


(二)数据流图

1.png

上方为数据流内部架构图。


方框内全量同步直接把全量数据dump下载下来,Fetcher推给Write进程进行编写。写的时候会把多条数据Batch到一起写,从而增大写入的吞吐,写前或者写后都可以进行建索引(Build Indexes)的操作。


增量过程直接tail Oplog不断拉取数据,拉取后经过Fetcher、Bather、Worker等几个流,最后Replayer写到MoangoDB中。此外,用户不止可以写入MoangoDB,也可以写入Kafka、Tcp、Rpc等模式,也提供用户自定义模式。


外部流程的同步工具有监控可以监控同步状况,HA用来进行实时切换,流量监控,全量校验,以及断点续传管理器可以解决增量同步时发生断点的情况。


(三)全量拉取

1.png

全量同步原理是多线程并发拉取并写入。


首先按表并发拉取,用户可以一次性拉取多个表,表内的多个文档会聚合成一个Batch然后写入队列。多个Write线程会从队列里进行并发拉取,然后写入到目的库,全量拉取之前和之后都可以建立一个索引。


(四)增量同步

1.png

相比全量同步,增量同步细节点更多,因此实现过程会复杂一些。


首先Fetcher负责从源端拉取Oplog,上方图中的数字代表Oplog序号。数据取完后会推给不同的Decoder进行解析,解析的过程中需要对数据进行保序,防止数据错乱。数据解析完成之后传给Batcher按序进行聚合。


Batcher完成后,用户可以选择用id或者ns方式进行Hash,之后发给多个Worker线程,之后数据可以根据需求进行校验、压缩等操作,然后写入不同通道。Replayer负责拉取之前写入的不同数据,然后进行对称操作,之后经过batch write写入目的库。


(五)哈希原理

上方提到,从Batcher到Worker有id和ns两种Hash方式,两种方式都有各自的优缺点,默认情况是按照ns方式。

1.png

1.按ns方式进行Hash

 Hash by namespace: db.collection

 crc32(ns) % n

 优点

1)同一个表内部保证有序;

2)表数目多的情况下,能保证较好的并发控制。

 缺点

单表倾斜,同步性能将会退化。


2.按id方式进行Hash

 Hash by document primary id in Oplog._id

 _id % n

 优点

1) 所有worker负载比较均衡;

2) 不会存在大表倾斜的问题;

3) 同一个_id文档操作保证有序。

 缺点

1) 不同_id文档没办法保证有序,存在幻读;

2) 存在唯一索引情况下,没办法并发。

关于缺点的第2点,下面举一个例子。

1.png


如上图所示,用户在a上面建立了唯一索引。此时如果按照_id进行并发,由于个a:1的_id可能不一样,在并发过程中,后方的a:1可能会跑到前方的a:1前面,最后输出的数据只有一条。


v.2.4.12版本优化:部分表按_id哈希,部分表按namespace哈希。


(六)DDL处理

当用户建库、建索引、删表,索引无法跟原来的文档同步,完全地进行并发。

例如,先建表然后插数据,和先插数据再建表,最后产生的结果完全不一样。

1.png

如上图所示,当出现C,表示command操作,在MangoDB里表示DDL操作,此时我们需要在C后面添加全局barrier屏障,保证前面的数据同步完成后,撤掉C后面的全局barrier屏障,在C前面添加全局barrier屏障,然后同步后面的数据。这样可以在保证正确性的前提下,提高并发。


(七)Checkpoint原理

 Checkpoint

1) 记录同步的位点,用于挂掉重启后,从上次记录的位点重新开始同步;

2) 对于DML是幂等的,重复回放不会有问题;

3) 对于DDL,每次同步将会强制刷新Checkpoint;

4) 为了保证性能,定期存储Checkpoint。

1.png

Checkpoint是基于LSN实现的,如上图所示,一个方框就代表一条Oplog。

 LSN

1) LSN:Oplog的序列号,由ts时间戳字段解析,同一个副本集内保证唯一;

2) LSN_ACK:标记已经成功写入的最大的LSN;

3) LSN_UNACK:标记已经tunnel中传输成功,但是还没有写入MongoDB;


  • 有些时候,比如对于非direct通道,需要重传从LSN_ACK到LSN_UNACK之间的Oplog
    4) LSN_CKPT标记Checkpoint,默认持久化在源库MongoDB的mongoshake.ckpt_default表内;
  • 一旦发生重启,LSN_CKPT开始的数据将全部重传
    5) 默认约束:LSN_UNACK >= LSN_ACK >= LSN_CKPT


(八)集群版架构—同步模式

集群版有多个Shard并行,对于同步有两种模式,一种是Oplog模式,需要需要关闭Balancer。另一种是Change Stream模式,无需关闭Balancer。


  1. Oplog模式同步(需要关闭Balancer)

1.png

Oplog模式启动了多个线程,每个线程对应一个Shard进行拉取,它的一个约束条件是需要关闭Balancer。


  1. Change Stream模式同步(无需关闭Balancer)

1.png

Change Stream模式直接从MangoS上拉取数据,无需关闭Balancer,建议集群版用户在增量同步时使用。


集群版开启Balancer下MangoShake同步存在什么问题?以下举例说明。

1.png

如上图所示,有Shard1和Shard2,Shard1上update一条数据,之后通过Move Chunk挪到Shard2上面。后来在Shard2上update一条数据,此时a=3。


预期的结果是先Shard1,再Shard2,最后得到a=3。


如果不关闭Balancer,则实际的结果可能是先执行Shard2上的oplog,再执行Shard1上,最后就得到a=1,破坏了一致性。


(九)解决集群版Move Chunk的问题

1.png

为什么Change Stream拉取可以不关闭Balancer?


Change Stream是MongoDB在3.6版本后推出的特性,如上图所示,对于这种场景,它能解决顺序先后性问题。如Shard1上有3个Chunk,里面可能有很多Oplog,Shard2和Shard3上同理,它是解决这种解决就是对于这种场景,当前这个场景它是能够解决顺序的先后性的问题。


Change Stream对3个Shard会建立3个对应的Cursor,Cursor在对应的Shard里进行数据拉取。如果Shard1的Chunk1移动到Shard2的Chunk1,在并发拉取的时候,Change Stream对Move Chunk可以进行保序。


(十)延迟同步

1.png

另外MongoShake支持延迟同步,例如加了一个时延控制器,设置Oplog3延迟1小时,则需要等待1小时后才能进行。


(十一)多Tunnel对接

1.png

如上图所示,MongoShake拉取后可以写到多个Tunnel中。


目前大多数用户使用的是Direct Tunnel,拉取后直接写到目的端。部分用户要求通过Receiver进行对接,如需要写RPC、TCP等模式,目前对接过程较为繁琐。Collector直接写入Kafka,用户可以直接从中进行下一轮消费,每条Kafka数据都是单条json格式。


(十二)监控 – 增量同步如何知道目前瓶颈

如何监控MongoShake?下面以监控增量同步瓶颈为例。

1.png

如上图所示,Syncer、Worker、Executor为内部多个线程,线程写到队列中,用户可以通过内部队列判断同步的瓶颈。


如果用户发现队列1经常处于未满状态,后面的队列都满了,则瓶颈为拉取。如果队列2满了,则说明Syncer到Worker之间的数据解析存在一定瓶颈。如果队列3满了,则表示内部Worker到Executor的过程。队列4满的话,则表示写入端会成为瓶颈,通常瓶颈会发生在队列4这个地方。


具体监控指标可以查看github wiki监控文档。


三、应用场景

(一)功能概述

 数据同步

1) 容灾:同城容灾、异地容灾

2) 迁移:跨版本、跨形态、跨云、跨地域

3) 多活:数据多向复制,各地可写

4) 分析:基于数据的实时分析、监控、告警等

5) 异构:MoangoDB数据转换为其他的数据库形态

6) 混合云:构建混合云平台


(二)案例

1.高德地图

1.png

高德地图在全国部署了华北、华东和华南三个机房,三地数据库搭建了6向MongoShake的复制链路,每一个地方都是可写的。比如写到华北的数据会同步到华东和华南,写到华东会同步到华北和华南,同理,写到华南会同步到华东和华北,中间链路采用MongoShake进行实现。


下面介绍如果发生容灾场景是怎么做的。

1.png

如上图所示,两个机房通过MongoShake双向同步,上面有个北向路由层,它决定哪条数据会发送到哪个机房进行路由的哈希。

1.png

假设左边机房发生宕机或者不可服务,路由的链路可以直接把发到左边机房的流量切到右边机房,从而实现数据容灾。


2.阿里云BLS服务

1.png

上面是阿里云云上之前售卖的BLS服务,是根据MongoShake搭建而成,用户自己可以购买两个数据库,然后购买 BLS的服务,然后直接可以帮用户搭建出一套双向的来回链路。


实现的原理是对于每一个集群都有一个Collector的进程,负责进行拉取,同时每个Collector有一个容灾的备进程。数据拉取完写进Kafka通道中,Receiver从Kafka拉取数据后写入目的端。中间有个中控Manager,会对服务进行定期保活、HA机制、监控等。


3.某电商客户数据分析

1.png

上图为某个电商客户根据MongoShake基于分析数据场景搭建的单向同步链路。用户在美国部署一个MoangoDB的集群提供读写服务,用户把MoangoDB的数据拉取到国内,然后进行读服务与数据分析。


4.某游戏客户容灾场景

1.png

如上图所示,某位游戏客户用户在两个机房里面搭建应一套数据库和业务,整套业务和数据库都是在同一个数据中心里,然后数据的分发也是根据北向路由层进行访问与切流。


在数据同步的时候,如果左边的机房挂掉,流量可以直接通过SLB切到右边的机房,实现容灾效果。


5.某游戏客户延迟同步,实时回滚场景

1.png

上图是某游戏客户用MongoShake做延迟同步的场景。


正常情况下,用户请求写到源库里面中,MongoShake可以构建延迟同步,延迟一个小时或两个小时。如果左边的源库发生宕机情况,用户可以把流量直接切到延迟库上,把数据回滚到延迟之前,相当于秒级RTO回滚数据。


6.全球级联同步场景

1.png

上图是某全球客户,通过MongoShake搭建全球级联场景,从新加坡到北京,从北京到孟买,然后再从孟买到法兰克福,实现全球级联场景。


7.监控分析场景

1.png

上方为监控分析的场景,用户采用MongoShake把数据从数据库拉取出来,然后写到Kafka,用Receiver读取完后推到下游的一些监控平台或者分析平台。

相关文章
|
5月前
|
数据采集 监控 关系型数据库
CDC 与 Oceanbase 的激情碰撞:实时采集数据的震撼之旅,颠覆数据世界的神秘冒险!
【8月更文挑战第7天】在数据处理领域,实时采集变得至关重要。OceanBase是一款高性能、可扩展的分布式数据库。通过变更数据捕获(CDC)技术实时采集其数据是一项关键技术。利用如Debezium等工具,可以实现OceanBase的数据变动捕捉。示例代码展示了如何配置Debezium以监听OceanBase的数据变更。实际应用中需按业务需求定制数据处理逻辑,并实施监控与错误管理以保障采集的准确性和稳定性,从而为业务提供实时数据支持,推动创新发展。
117 1
|
5月前
|
存储 SQL 运维
“震撼发布!PolarDB-X:云原生分布式数据库巨擘,超高并发、海量存储、复杂查询,一网打尽!错过等哭!”
【8月更文挑战第7天】PolarDB-X 是面向超高并发、海量存储和复杂查询场景设计的云原生分布式数据库系统
118 1
|
5月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
899 0
|
SQL 存储 分布式计算
【国庆弯道超车系列】NoSQL基础及MongoDB入门安装
【国庆弯道超车系列】NoSQL基础及MongoDB入门安装
174 0
|
SQL 存储 JSON
【国庆弯道超车系列】MongoDB入门基础知识
【国庆弯道超车系列】MongoDB入门基础知识
102 0
|
缓存 监控 供应链
36【学习心得】学习心得-数据同步
【学习心得】学习心得-数据同步
36【学习心得】学习心得-数据同步
|
运维 容灾 分布式数据库
一年一度 OceanBase 技术征文大赛全面开启! 入门实战,等您来写
为了让更多用户能够真正体验到 OceanBase 数据库,Gitee 联合 OceanBase 发起一年一度 OceanBase 技术征文大赛,邀请广大开发者体验从部署到迁移的各种玩法,第一期征文主题将围绕《OceanBase 社区版入门到实战》课程学习与实践。
|
运维 Cloud Native 关系型数据库
PolarDB for PostgreSQL开源训练营,学习还能抢AirPods等大奖!
2021年5月29日阿里云开发者大会上,阿里云宣布正式开源云原生数据库能力,开放关系型数据库 PolarDB for PostgreSQL的源代码。本次阿里云开源的是PolarDB for PostgreSQL分布式版,包括数据库内核、相关插件、工具脚本、测试用例以及设计文档,适用于中大型企业核心业务场景。PolarDB for PostgreSQL是阿里云自主研发的云原生数据库,采用基于 Shared-Storage 的存储计算分离架构,具有极致弹性、毫秒级延迟、HTAP 的能力。
228 0
PolarDB for PostgreSQL开源训练营,学习还能抢AirPods等大奖!
|
存储 NoSQL 关系型数据库
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(上)
【大厂技术内幕】字节跳动原来是这么做数据迁移的!
409 0
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(上)
|
存储 分布式数据库 Hbase
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(中)
【大厂技术内幕】字节跳动原来是这么做数据迁移的!
176 0
【大厂技术内幕】字节跳动原来是这么做数据迁移的!(中)