使用DynamoShake从dynamodb迁移到mongodb-阿里云开发者社区

开发者社区> 烛昭> 正文

使用DynamoShake从dynamodb迁移到mongodb

简介: 去年和今年年初,我们开源了MongoShake和RedisShake分别用于MongoDB和Redis的迁移、同步、备份等多种需求。最近,我们的shake系列又进一步壮大,我们推出了一款dynamodb迁移的工具:dynamo-shake。
+关注继续查看

去年和今年年初,我们开源了MongoShakeRedisShake分别用于MongoDB和Redis的迁移、同步、备份等多种需求。最近,我们的shake系列又进一步壮大,我们推出了一款dynamodb迁移的工具:dynamo-shake(又名nimo-shake)。目前支持从dynamodb迁移到MongoDB,后续我们还会考虑支持多种通道,比如直接文件备份、迁移至kafka,或者迁移到别的数据库如cassandra,redis等。
github地址:https://github.com/alibaba/nimoshake。里面可以找到下载的链接。

DynamoShake基本功能

DynamoDB支持全量和增量的同步,进程启动后会先进行全量同步,全量同步结束后进入增量同步的阶段。
全量同步分为数据同步和索引同步两部分,数据同步用于同步数据,数据同步结束后将会进行索引的同步,索引同步会同步默认的primary key,用户自建的索引GSI如果MongoDB是副本集支持,集群版目前暂时不支持同步。
增量同步只同步数据,不同步增量同步过程中产生的索引。
此外,全量和增量同步阶段不支持对原来的库表进行DDL操作,比如删表,建表,建索引等。
image

断点续传

全量同步不支持断点续传功能,增量同步支持断点续传,也就是说如果增量断开了,一定时间内恢复是可以只进行增量的断点续传。但在某些情况下,比如断开的时间过久,或者之前的位点(参考下文)丢失,那么都会导致重新触发全量同步。

同步数据

所有源端的表会写入到目的的一个库(默认是dynamo-shake)的不同表中,比如用户有table1,table2,那么同步完后,目的端会有个dynamo-shake的库,库里面有table1和table2的表。
在原生的dynamodb中,协议是包裹了一层类型字段,其格式是“key: type: value”格式,例如用户插入了一条{hello: 1},那么dynamodb接口获取的数据是{"hello": {"N": 1}}的格式。
Dynamo所有的数据类型:

  • String
  • Binary
  • Number
  • StringSet
  • NumberSet
  • BinarySet
  • Map
  • List
  • Boolean
  • Null

那么我们提供2种转换方式,raw和change,其中raw就是按照裸的dynamodb接口获取的数据写入:

rszz-4.0-2:PRIMARY> use dynamo-shake
switched to db dynamo-shake
rszz-4.0-2:PRIMARY> db.zhuzhao.find()
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : { "L" : [ { "S" : "aa1" }, { "N" : "1234" } ] }, "hello_world" : { "S" : "f2" } }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : { "N" : "222" }, "qqq" : { "SS" : [ "h1", "h2" ] }, "hello_world" : { "S" : "yyyyyyyyyyy" }, "test" : { "S" : "aaa" } }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : { "L" : [ { "N" : "0" }, { "N" : "1" }, { "N" : "2" } ] }, "hello_world" : { "S" : "测试中文" } }

change表示剥离类型字段:

rszz-4.0-2:PRIMARY> use dynamo-shake
switched to db dynamo-shake
rszz-4.0-2:PRIMARY> db.zhuzhao.find()
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : [ "aa1", 1234 ] , "hello_world" : "f2" }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : 222, "qqq" : [ "h1", "h2" ] , "hello_world" : "yyyyyyyyyyy", "test" : "aaa" }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : [ 0, 1, 2 ], "hello_world" : "测试中文" }

用户可以根据自己的需求制定自己的同步类型。

位点

增量的断点续传是根据位点来实现的,默认的位点是写入到目的MongoDB中,库名是dynamo-shake-checkpoint。每个表都会记录一个checkpoint的表,同样还会有一个status_table表记录当前是全量同步还是增量同步阶段。

rszz-4.0-2:PRIMARY> use dynamo-shake42-checkpoint
switched to db dynamo-shake42-checkpoint
rszz-4.0-2:PRIMARY> show collections
status_table
zz_incr0
zz_incr1
rszz-4.0-2:PRIMARY>
rszz-4.0-2:PRIMARY>
rszz-4.0-2:PRIMARY> db.status_table.find()
{ "_id" : ObjectId("5d6e0ef77e592206a8c86bfd"), "key" : "status_key", "status_value" : "incr_sync" }
rszz-4.0-2:PRIMARY> db.zz_incr0.find()
{ "_id" : ObjectId("5d6e0ef17e592206a8c8643a"), "shard_id" : "shardId-00000001567391596311-61ca009c", "father_id" : "shardId-00000001567375527511-6a3ba193", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c8644c"), "shard_id" : "shardId-00000001567406847810-f5b6578b", "father_id" : "shardId-00000001567391596311-61ca009c", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86456"), "shard_id" : "shardId-00000001567422218995-fe7104bc", "father_id" : "shardId-00000001567406847810-f5b6578b", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86460"), "shard_id" : "shardId-00000001567438304561-d3dc6f28", "father_id" : "shardId-00000001567422218995-fe7104bc", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c8646a"), "shard_id" : "shardId-00000001567452243581-ed601f96", "father_id" : "shardId-00000001567438304561-d3dc6f28", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86474"), "shard_id" : "shardId-00000001567466737539-cc721900", "father_id" : "shardId-00000001567452243581-ed601f96", "seq_num" : "", "status" : "no need to process", "worker_id" : "unknown-worker", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "", "update_date" : "" }
{ "_id" : ObjectId("5d6e0ef27e592206a8c8647e"), "shard_id" : "shardId-00000001567481807517-935745a3", "father_id" : "shardId-00000001567466737539-cc721900", "seq_num" : "", "status" : "done", "worker_id" : "unknown-worker", "iterator_type" : "LATEST", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAGsTOg0+3HY+yzzD1cTzc7TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUYwKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87iyTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2nIkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg==", "update_date" : "" }
{ "_id" : ObjectId("5d6e1d807e592206a8c9a102"), "shard_id" : "shardId-00000001567497561747-03819eba", "father_id" : "shardId-00000001567481807517-935745a3", "seq_num" : "39136900000000000325557205", "status" : "in processing", "worker_id" : "unknown", "iterator_type" : "AT_SEQUENCE_NUMBER", "shard_it" : "arn:aws:dynamodb:us-east-2:240770237302:table/zz_incr0/stream/2019-08-27T08:23:51.043|1|AAAAAAAAAAFw/qdbPLjsXMlPalnhh55koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh2iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YYwl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh4QVDIP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFLgf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6atuc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th35/5rLTGPYJZA==", "update_date" : "" }

其中status_table表中"status_value" : "incr_sync"表示进入了增量阶段。增量的每个shard都会记录一个checkpoint,关于具体shard分裂的规则可以参考dynamodb的guan'fa官方文档。下面是增量表checkpoint的各个字段的说明:

  • _id:mongodb自带主键id
  • shard_id:shard的id,每个shard有一个唯一的id
  • father_id:父shard的id,shard可能有一个父shard。
  • seq_num: 目前处理到的shard内部的sequence number,这个是主要的位点信息。
  • status: 目前同步的阶段,一共有以下几个状态:

    • "not process": 未处理
    • "no need to process": 没有必要处理
    • "prepare stage":准备处理
    • "in processing": 处理中
    • "wait father finish":等待父节点处理完毕再进行处理
    • "done": 处理完毕
  • worker_id:处理的worker id,目前暂未启用
  • iterator_type:shard的遍历方式
  • shard_it:shard的迭代器地址,次要位点信息。
  • update_date:checkpoint更新的时间戳

索引

根据默认的primary key创建一个唯一索引,并且根据partition key创建shard key。用户自己的索引gsi目前不进行创建。

DynamoShake内部架构

本小节主要介绍DynamoShake的部分架构细节

全量同步

下图是基本的一个table的数据同步架构图(dynamo-shake会启动多个并发线程tableSyncer进行拉取,用户可控并发度),fetcher线程从源端dynamodb拉取数据后将数据推入队列,紧接着parser线程从队列中拿取数据并进行解析(dynamo协议转bson),executor负责聚合部分数据并写入mongodb。
image

  • fetcher。目前fetcher线程只有1个,用的是协议转换驱动是aws提供的driver。fetcher的原理是调用driver进行批量抓取源库的数据,抓到了就塞入队列中,直到抓完当前table的所有数据。fetcher单独分离出来主要是出于网络IO考虑的,目前拉取受网络影响,会比较慢。
  • parser。parser可以启动多个,默认目前是2个,用户可以通过FullDocumentParser进行控制。其主要就是从队列中读取数据,并解析成bson结构。parser解析后,数据按条写入executor的队列。parser线程单独独立出来主要是出于解析比较耗CPU资源考虑。
  • executor。executor也可以启动多个,默认目前是4个,用户可以通过FullDocumentConcurrency进行控制。executor从队列中拉取,并进行batch聚合(聚合上限16MB,总条数1024)后写入目的mongodb。
    当前所有表的数据写入完后,tableSyncer将会退出。

增量同步

增量整体架构如下:
image
Fetcher线程负责感知stream中shard的变化,Manager负责进行消息的通知,或者创建新的Dispatcher进行消息的处理,一个shard对应一个Dispatcher。Dispatcher从源端拉取增量数据,并通过Batcher进行数据解析和打包整合,然后通过executor进行写入到MongoDB,同时会更新checkpoint。另外,如果是断点续传,那么Dispatcher会从旧的checkpoint位点开始拉取,而不是从头开始拉。

DynamoShake的使用

启动:./dynamo-shake -conf=dynamo-shake.conf,配置参数在dynamo-shake.conf中指定,以下是各个参数的意义:

  • id: 修改会影响MongoDB上目的库的名字
  • log.file:日志文件,不配置将打印到标准输出
  • log.level: log级别。推荐默认。
  • log.buffer: 打印是否带缓存。推荐默认。
  • system_profile:打印内部堆栈的端口号。推荐默认。
  • http_profile:暂未启用
  • sync_mode:同步模式,all表示全量+增量,full表示仅全量,incr表示仅增量(目前不支持)
  • source.access_key_id: dynamodb连接配置参数
  • source.secret_access_key: dynamodb连接配置参数
  • source.session_token: dynamodb连接配置参数,没有可以留空
  • source.region: dynamodb连接配置参数
  • filter.collection.white:过滤白名单,只同步指定的表
  • filter.collection.black:过滤黑名单,不通过指定的表。
  • qps.full:全量阶段限速,1秒钟发送多少个请求
  • qps.full.batch_num:全量阶段限速,1个请求最多包括多少个item。
  • qps.incr:增量阶段限速,1秒钟发送多少个请求
  • qps.incr.batch_num:增量阶段限速,1个请求最多包括多少个item。
  • target.type:目的端配置,目前仅支持mongodb
  • target.address: 目的端mongodb的连接串地址。
  • target.mongodb.type: mongodb是replica还是sharding
  • target.mongodb.exist:如果目的库同名表存在,执行什么行为。drop表示删除,rename表示重命名,留空表示不处理。
  • full.concurrency:全量同步的线程个数,1个线程对应1个表
  • full.document.concurrency:全量同步1个表内的并发个数。
  • full.document.parser:1个表内parser线程的个数
  • full.enable_index.primary:是否同步dynamodb的primary key。
  • full.enable_index.user:是否同步用户自建的索引,目前不支持
  • convert.type:写入的模式,raw表示裸写入, change表示解析类型字段后写入,参考上述文档。
  • increase.concurrency:增量同步并发参数,1次最多抓取的shard个数
  • checkpoint.address = checkpont的存储地址,默认不配置与目的库一致。
  • checkpoint.db = checkpoint写入的db的名字,默认是$db-checkpoint。

DynamoFullCheck

DynamoFullCheck是一个用于校验DynamoDB和MongoDB数据是否一致的工具,目前仅支持全量校验,不支持增量,也就是说,如果增量同步阶段,那么源和目的是不一致的。
DynamoFullCheck只支持单向校验,也就是校验DynamoDB的数据是否是MongoDB的子集,反向不进行校验。
另外,还支持抽样校验,支持只校验感兴趣的表。
校验主要分为以下几部分:

  • 轮廓校验。首先,校验两边的表中数目是否一致;接着,校验索引是否一致(目前没做索引校验)。注意,如果表中数目不一致,将会直接退出,不会进行后续的校验。
  • 精确校验。精确校验数据,原理是从源端拉取数据并解析,如果有唯一索引,那么根据唯一索引查找MongoDB的doc,并对比一致性;如果没有唯一索引,那么会根据整个doc在MongoDB中进行查找(比较重)。
    抽样原理:

精确校验的时候,如果启用抽样,那么会对每个doc进行抽样,判断当前doc是否需要抽样。原理比较简单,比如按30%抽样,那么再0~100中产生一个随机数,如果是0~30的就校验,反之不校验。
DynamoFullCheck由于从源DynamoDB拉取也需要经过fetch,parse阶段,所以一定程度上,该部分代码复用了DynamoShake,不同的是DynamoFullCheck内部各个fetcher, parser, executor线程并发度都是1。

使用参数

full-check参数稍微简单点,直接用的命令行注入,例如:./dynamo-full-check --sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q --sourceSecretAccessKey=TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe --sourceRegion=ap-east-1 -t="10.1.1.1:30441" --sample=300

Usage:
  dynamo-full-check.darwin [OPTIONS]

Application Options:
  -i, --id=                    target database collection name (default: dynamo-shake)
  -l, --logLevel=
  -s, --sourceAccessKeyID=     dynamodb source access key id
      --sourceSecretAccessKey= dynamodb source secret access key
      --sourceSessionToken=    dynamodb source session token
      --sourceRegion=          dynamodb source region
      --qpsFull=               qps of scan command, default is 10000
      --qpsFullBatchNum=       batch number in each scan command, default is 128
  -t, --targetAddress=         mongodb target address
  -d, --diffOutputFile=        diff output file name (default: dynamo-full-check-diff)
  -p, --parallel=              how many threads used to compare, default is 16 (default: 16)
  -e, --sample=                comparison sample number for each table, 0 means disable (default: 1000)
      --filterCollectionWhite= only compare the given tables, split by ';'
      --filterCollectionBlack= do not compare the given tables, split by ';'
  -c, --convertType=           convert type (default: raw)
  -v, --version                print version

Help Options:
  -h, --help                   Show this help message

其他

10.15已经开源,欢迎关注。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
10053 0
【MongoDB训练营】第五课:ChangeStream使用及原理介绍 答疑汇总
【MongoDB训练营】第五课:ChangeStream使用及原理介绍 答疑汇总
253 0
第12章—使用NoSQL数据库—使用MongoDB+Jpa操作数据库
使用MongoDB+Jpa操作数据库 SpringData还提供了对多种NoSQL数据库的支持,包括MongoDB;neo4j和redis.他不仅支持自动化的repository,还支持基于模板的数据访问和映射注解.
1295 0
MongoDB sharding迁移那些事(二)
如果不了解 MongoDB Sharded Cluster 原理,请先阅读 MongoDB Sharded cluster架构原理 关于MongoDB Sharding,你应该知道的 关于 sharding 迁移,会分3个部分来介绍,本文为第二部分 负载均衡及迁移策略 chunk 迁移
6113 0
【MongoDB训练营】第四课:分片集群的使用及原理介绍 答疑汇总
【MongoDB训练营】第四课:分片集群的使用及原理介绍 答疑汇总
388 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13867 0
一张图看懂互联网史上最大规模的公共云迁移案例
6月7日,阿里云与国内领先的云存储企业115科技在2018云栖大会·上海峰会上宣布,已将公司全部数据迁移至阿里云上,总量超过100PB。 至此,双方合力完成了互联网史上规模最大的公共云数据迁移,仅耗时45天,创造了百PB级数据公共云迁移的新纪录,而这都要依托于阿里云推出的全新离线数据迁移方案——闪电立方。
1542 0
+关注
烛昭
阿里云数据库团队研发工程师,负责MongoDB相关产品的研发
7
文章
7
问答
来源圈子
更多
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载