
每到年底,比铁路部门还忙碌就是各路小偷,恰巧最近一篇关于NoSQL暴露公网的文章开拓了他们的思路。就在文章发布后这一周里,陆陆续续接到关于自建MongoDB被“黑客”攻击,数据被删,并且索要Q币的案例。今天我自卖自夸下,为了数据库安全,快点上云MongoDB。但正式自夸之前,我们先来复盘看看。 被黑者特征 本次事件的作案手法实在是称不上高级,所以我暂且称他们为“坏人”,但这事不能完全埋怨“坏人”,被攻击的这些自建MongoDB,全部都满足两个特征: 1. 暴露公网地址,甚至有些实例端口号都是默认的27017; 2. 没有配置鉴权约束,谁来都可以访问; 怎么攻击的 这就是把自己家的大门敞开,任由“坏人”搜割,“坏人”会做几件事: 1. 访问shadon网站,本来这个网站是做互联网数据分析的,原理是扫描全球IP和端口,通过分析协议来收集各种设备或者服务的统计数据。不幸被人利用,这些“坏人”甚至连端口扫描都不用,shadon都为你准备好了; 2. 用mongoshell等管理工具尝试登入,因为很多人的MongoDB都没有配置鉴权,所以连破解密码都的步骤都可以省了; 3. 单独破坏数据库意义不大,一般都是把数据dump到本地留存一份,或者直接修改集合名称或者地址,存在一个相对隐蔽的地方;也有“坏人”纯恶意破坏; 4. 勒索受害人,索要Q币或者比特币,交钱放“人”(归还数据,如果有的话); 该埋怨谁 看吧,实在不是很高明的手段,也真的不能完全埋怨“坏人”不讲情理。这事情责任三方各打一板,怎么是三方? 1. “坏人”责任50%,勒索良民; 2. 自己责任40%,谁叫你公网地址,谁叫你不配鉴权的?大门敞开,家里被盗,上哪都喊不了冤; 3. MongoDB责任10%,说起来有点冤,但默认配置就应该强制要求鉴权访问,并且这事情也是需要官方不断去教育用户(虽然官方英文文档明确有写明要配置鉴权来保障安全),但不能指望人人都有安全意识;另外,MongoDB 3.0开始修改了鉴权协议,不支持2.x的Driver,很多用户也因此懒得去升级客户端,索性不要鉴权; 亡羊补牢 事实已经发生了,埋怨没有用,怎么解决: 1. 不论你有没有中招,有没有在公网,都要检查鉴权配置,避免更多的损失; 2. 关闭公网的访问入口,把门关上; 3. 如果你是阿里云用户,尝试恢复ECS存储镜像,有多少算多少;但是有一定的风险是镜像恢复了,但数据文件可能是不完整的,用MongoDB的repair命令尝试修复,祈祷吧; 4. 碰碰运气,看看数据表是不是只是被rename了,因为数据dump是有时间成本和存储成本的,有监控数据的可以对比下看看容量有没有变化; 5. [**心有余孽的话就快点用阿里云MongoDB**;](https://www.aliyun.com/product/mongodb) 还是来用阿里云MongoDB吧 阿里云的云版MongoDB从设计之初就重点考虑了安全问题,所以整个服务提供上我们具备: 1. 基于MongoDB 3.2兼容版本的解决方案,并且强制要求鉴权,并且默认不提供公网入口。还在用2.X版本的同学们尽快升级吧,各种原因可以参考另一篇文章,[《告别 MongoDB 2.x 拥抱 3.x 版本的5大理由》](https://yq.aliyun.com/articles/67120?spm=5176.100240.searchblog.22.YA5jTQ); 2. 阿里云提供云MongoDB的VPC能力,即使在阿里云内网,也可以网络隔离; 3. 阿里云MongoDB提供白名单功能,只接受你自己的ECS访问; 4. 重点的来了哦,这可是自建没有能力,增量数据备份与恢复,哪怕被人删了,或者自己不小心drop了,可以恢复到任意时间的数据状态; 5. 重点的又来了,这可是官方企业版的能力,完整的审计日志,被谁删的,怎么删的,数据哪里去了,一清二楚; 6. 阿里云安全专家众多,“攻击者”也是看风险和收益比的; 另外,值得一提的是,因为PHP这种高级语言,对数据库的操作都是短连接,所以鉴权开启后会带来大量的性能损失(主要影响并发能力),阿里云MongoDB也在这方面做了优化处理,针对短连接场景,阿里云有10倍性能优化。同时,也提供上云在线迁移服务DTS,详情请访问官网了解。 最后再来个硬广,我们不趁火打劫,还大幅度降价,比自建成本更低的三节点集群服务,ApsaraDB产品线6折促销中,请猛烈点击我。
注:本案例来自MongoDB官方教程PPT,也是一个非常典型的CASE,故此翻译,并结合当前MongoDB版本做了一些内容上的更新。 本案例非常适合与IoT场景的数据采集,结合MongoDB的Sharding能力,文档数据结构等优点,可以非常好的解决物联网使用场景。 需求 案例背景是来自真实的业务,美国州际公路的流量统计。数据库需要提供的能力: 存储事件数据 提供分析查询能力 理想的平衡点: 内存使用 写入性能 读取分析性能 可以部署在常见的硬件平台上 几种建模方式 每个事件用一个独立的文档存储 { segId: "I80_mile23", speed: 63, ts: ISODate("2013-10-16T22:07:38.000-0500") } 非常“传统”的设计思路,每个事件都会写入一条同样的信息。多少的信息,就有多少条数据,数据量增长非常快。 数据采集操作全部是Insert语句; 每分钟的信息用一个独立的文档存储(存储平均值) { segId: "I80_mile23", speed_num: 18, speed_sum: 1134, ts: ISODate("2013-10-16T22:07:00.000-0500") } 对每分钟的平均速度计算非常友好(speed_sum/speed_num); 数据采集操作基本是Update语句; 数据精度降为一分钟; 每分钟的信息用一个独立的文档存储(秒级记录) { segId: "I80_mile23", speed: {0:63, 1:58, ... , 58:66, 59:64}, ts: ISODate("2013-10-16T22:07:00.000-0500") } 每秒的数据都存储在一个文档中; 数据采集操作基本是Update语句; 每小时的信息用一个独立的文档存储(秒级记录) { segId: "I80_mile23", speed: {0:63, 1:58, ... , 3598:54, 3599:55}, ts: ISODate("2013-10-16T22:00:00.000-0500") } 相比上面的方案更进一步,从分钟到小时: 每小时的数据都存储在一个文档中; 数据采集操作基本是Update语句; 更新最后一个时间点(第3599秒),需要3599次迭代(虽然是在同一个文档中) 进一步优化下: { segId: "I80_mile23", speed: { 0: {0:47, ..., 59:45}, ..., 59: {0:65, ... , 59:56} } ts: ISODate("2013-10-16T22:00:00.000-0500") } 用了嵌套的手法把秒级别的数据存储在小时数据里; 数据采集操作基本是Update语句; 更新最后一个时间点(第3599秒),需要59+59次迭代; 嵌套结构正是MongoDB的魅力所在,稍动脑筋把一维拆成二维,大幅度减少了迭代次数; 每个事件用一个独立的文档存储VS每分钟的信息用一个独立的文档存储 从写入上看:后者每次修改的数据量要小很多,并且在WiredTiger引擎下,同一个文档的修改一定时间窗口下是可以在内存中合并的;从读取上看:查询一个小时的数据,前者需要返回3600个文档,而后者只需要返回60个文档,效率上的差异显而易见;从索引上看:同样,因为稳定数量的大幅度减少,索引尺寸也是同比例降低的,并且segId,ts这样的冗余数据也会减少冗余。容量的降低意味着内存命中率的上升,也就是性能的提高; 每小时的信息用一个独立的文档存储VS每分钟的信息用一个独立的文档存储 从写入上看:因为WiredTiger是每分钟进行一次刷盘,所以每小时一个文档的方案,在这一个小时内要被反复的load到PageCache中,再刷盘;所以,综合来看后者相对更合理;从读取上看:前者的数据信息量较大,正常的业务请求未必需要这么多的数据,有很大一部分是浪费的;从索引上看:前者的索引更小,内存利用率更高; 总结 那么到底选择哪个方案更合理呢?从理论分析上可以看出,不管是小时存储,还是分钟存储,都是利用了MongoDB的信息聚合的能力。 每小时的信息用一个独立的文档存储:设计上较极端,优势劣势都很明显; 每分钟的信息用一个独立的文档存储:设计上较平衡,不会与业务期望偏差较大; 落实到现实的业务上,哪种是最优的?最好的解决方案就是根据自己的业务情况进行性能测试,以上的分析只是“理论”基础,给出“实践”的方向,但千万不可以此论断。 VS InfluxDB 说到时序存储需求,大家一定还会想到非常厉害的InfluxDB,InfluxDB针对时序数据做了很多特定的优化,但MongoDB采用聚合设计模式同样也可以大幅度较少数据尺寸。根据最新的测试报告,读取性能基本相当,压缩能力上InfluxDB领先MongoDB。但MongoDB的优势在于可以存储更丰富的信息,比如地理坐标,文本描述等等其他属性,业务场景上支持更广泛。另外,MongoDB的Sharding水平扩展能力,Aggragation功能,Spark Connector等等特性,对IoT来说,生态优势明显。
需求 社交类的APP需求,一般都会引入“朋友圈”功能,这个产品特性有一个非常重要的功能就是评论体系。先整理下需求: 这个APP希望点赞和评论信息都要包含头像信息: 点赞列表,点赞用户的昵称,头像; 评论列表,评论用户的昵称,头像; 数据查询则相对简单: 根据分享ID,批量的查询出10条分享里的所有评论内容; 建模 不好的 跟据上面的内容,先来一个非常非常"朴素"的设计: { "_id": 41, "username": "小白", "uid": "100000", "headurl": "http://xxx.yyy.cnd.com/123456ABCDE", "praise_list": [ "100010", "100011", "100012" ], "praise_ref_obj": { "100010": { "username": "小一", "headurl": "http://xxx.yyy.cnd.com/8087041AAA", "uid": "100010" }, "100011": { "username": "mayun", "headurl": "http://xxx.yyy.cnd.com/8087041AAB", "uid": "100011" }, "100012": { "username": "penglei", "headurl": "http://xxx.yyy.cnd.com/809999041AAA", "uid": "100012" } }, "comment_list": [ "100013", "100014" ], "comment_ref_obj": { "100013": { "username": "小二", "headurl": "http://xxx.yyy.cnd.com/80232041AAA", "uid": "100013", "msg": "good" }, "100014": { "username": "小三", "headurl": "http://xxx.yyy.cnd.com/11117041AAB", "uid": "100014", "msg": "bad" } } } 可以看到,comment_ref_obj与praise_ref_obj两个字段,有非常重的关系型数据库痕迹,猜测,这个系统之前应该是放在了普通的关系型数据库上,或者设计者被关系型数据库的影响较深。而在MongoDB这种文档型数据库里,实际上是没有必要这样去设计,这种建模只造成了多于的数据冗余。 另外一个问题是,url占用了非常多的信息空间,这点在压测的时候会有体现,带宽会过早的成为瓶颈。同样username信息也是如此,此类信息相对来说是全局稳定的,基本不会做变化。并且这类信息跟随评论一起在整个APP中流转,也无法结局”用户名修改“的需求。 根据这几个问题,重新做了优化的设计建议。 推荐的设计 { "_id": 41, "uid": "100000", "praise_uid_list": [ "100010", "100011", "100012" ], "comment_msg_list": [ { "100013": "good" }, { "100014": "bad" } ] } 对比可以看到,整个结构要小了几个数量级,并且可以发现url,usrname信息都全部不见了。那这样的需求应该如何去实现呢? 从业务抽象上来说,url,username这类信息实际上是非常稳定的,不会发生特别大的频繁变化。并且这两类信息实际上都应该是跟uid绑定的,每个uid含有指定的url,username。是最简单的key,value模型。所以,这类信息非常适合做一层缓存加速读取查询。 进一步的,每个人的好友基本上是有限的,头像,用户名等信息,甚至可以在APP层面进行缓存,也不会消耗移动端过多容量。但是反过来看,每次都到后段去读取,不但浪费了移动端的流量带宽,也加剧了电量消耗。 总结 MongoDB的文档模型固然强大,但绝对不是等同于关系型数据库的粗暴聚合,而是要考虑需求和业务,合理的设计。有些人在设计时,也会被文档模型误导,三七二十一一股脑的把信息塞到一个文档中。反而最后会带来各种使用问题。
需求 最近收到一个业务需求,需求是基于电影票售卖的不同渠道价格存储。某一个场次的电影,不同的销售渠道对应不同的价格。整理需求为: 数据字段: 场次信息; 播放影片信息; 渠道信息,与其对应的价格; 渠道数量最多几十个; 业务查询有两种: 根据电影场次,查询某一个渠道的价格; 根据渠道信息,查询对应的所有场次信息; 建模 不好的 我们先来看其中一种典型的不好建模设计: { "scheduleId": "0001", "movie": "你的名字", "price": { "gewala": 30, "maoyan": 50, "taopiao": 20 } } 数据表达上基本没有字段冗余,非常紧凑。再来看业务查询能力: 根据电影场次,查询某一个渠道的价格; 建立createIndex({scheduleId:1, movie:1})索引,虽然对price来说没有创建索引优化,但通过前面两个维度,已经可以定位到唯一的文档,查询效率上来说尚可; 根据渠道信息,查询对应的所有场次信息; 为了优化这种查询,需要对每个渠道分别建立索引,例如: createIndex({"price.gewala":1}) createIndex({"price.maoyan":1}) createIndex({"price.taopiao":1}) 但渠道会经常变化,并且为了支持此类查询,肯能需要创建几十个索引,对维护来说简直就是噩梦; 此设计行不通,否决。 一般般的设计 { "scheduleId": "0001", "movie": "你的名字", "channel": "gewala", "price": 30 } { "scheduleId": "0001", "movie": "你的名字", "channel": "maoyan", "price": 50 } { "scheduleId": "0001", "movie": "你的名字", "channel": "taopiao", "price": 20 } 与上面的方案相比,把整个存储对象结构进行了平铺展开,变成了一种表结构,传统的关系数据库多数采用这种类型的方案。信息表达上,把一个对象按照渠道维度拆成多个,其他的字段进行了冗余存储。如果业务需求再复杂点,造成的信息冗余膨胀非常巨大。膨胀后带来的副作用会有磁盘空间占用上升,内存命中率降低等缺点。对查询的处理呢: 根据电影场次,查询某一个渠道的价格; 建立createIndex({scheduleId:1, movie:1, channel:1})索引; 根据渠道信息,查询对应的所有场次信息; 建立createIndex({channel:1})索引; 更进一步的优化呢? 合理的设计 { "scheduleId": "0001", "movie": "你的名字", "provider": [ { "channel": "gewala", "price": 30 }, { "channel": "maoyan", "price": 50 }, { "channel": "taopiao", "price": 20 } ] } 注意看,这里使用了在MongoDB建模中非常容易忽略的结构--”数组“。查询方面的处理,是可以建立Multikey Index索引,详细信息可以参考官方文档。]说明 根据电影场次,查询某一个渠道的价格; 建立createIndex({scheduleId:1, movie:1, "provider.channel":1})索引; 根据渠道信息,查询对应的所有场次信息; 建立createIndex({"provider.channel":1})索引; 再通过explain来验证上面两个索引是否起到作用: db.movie.find({"scheduleId":"0001","movie":"你的名字", "provider.channel":"taopiao"}).explain() ...... "winningPlan": { "stage": "FETCH", "inputStage": { "stage": "IXSCAN", "keyPattern": { "scheduleId": 1, "movie": 1, "provider.channel": 1 }, "indexName": "scheduleId_1_movie_1_provider.channel_1", "isMultiKey": true, ...... db.movie.find({"provider.channel":"taopiao"}).explain() ...... "winningPlan": { "stage": "FETCH", "inputStage": { "stage": "IXSCAN", "keyPattern": { "provider.channel": 1 }, "indexName": "provider.channel_1", "isMultiKey": true, ...... 总结 这个案例并不复杂,需求也很清晰,但确实非常典型的MongoDB建模设计,开发人员在进行建模设计时经常也会受传统数据库的思路影响,沿用之前的思维惯性,而忽略了“文档”的价值。
连接是要消耗资源的,而且消耗的并不少。 内存:MongoDB为例,每个线程都要分配1MB的栈内存出来。1000个连接,1G内存就这么没了,甭管是否是活跃连接 文件句柄:每个连接都要打开一个文件句柄,当然从成本上讲,这个消耗相对内存是小了很多。但换个角度,文件句柄也被其他模块消耗着,比如WT存储引擎,就需要消耗大量的文件句柄 是否真的需要这么多的链接,一般的业务场景下请求压力在1000QPS左右,按照每个请求50ms计算,最多也就需要1000/(1000/50)==50个链接即可满足需求,并且是整个系统50个链接即可。 很多人平时没有怎么注意过链接数概念,上云后发现居然有这样的限制,心里很不舒服,可能非常不理解。这里说下常见的两种情况: 短链接:一般都是PHP环境,因为PHP的框架决定了PHP短链接的特性,并且链接数的需求一般是在1000-3000左右,具体多少还要根据业务部署的PHP数量来计算。并且MongoDB开源版本在短链接Auth处理上并不优雅,会消耗非常多的CPU资源,3000链接即可跑满24Core的CPU。PHP大拿Facebook也有同样的问题,所以他们用go语言自行开发了一套Proxy代理,来解决对MongoDB的短链接请求问题,但这毕竟带来部署成本和兼容性问题。阿里云的解决方案是从MongoDB源码优化下手,可以参考文章 长链接:比较健康合理的使用方式,但是也要正确的配置客户端,相关的参数为&maxPoolSize=xx 在ConnectionURI上追加上去即可,否则默认每个客户端就是高处100来个,平白的浪费资源 链接数的上限需要综合考虑性能,稳定性,业务需求。多方面去考虑,缺一不可。超低的内存,配置超高的链接数,得到的只能是OOM。 更多的关于连接数和如何正确配置的文章请参考MongoDB云数据库常见问题诊断
框架实现 FTS本质上也是Btree索引类型 索引AccessMethod定义: class FTSAccessMethod : public BtreeBasedAccessMethod 关键成员: fts::FTSSpec _ftsSpec; 获取索引的函数入口: void FTSAccessMethod::getKeys(const BSONObj& obj, BSONObjSet* keys) { ExpressionKeysPrivate::getFTSKeys(obj, _ftsSpec, keys); } 追踪到: fts::FTSIndexFormat::getKeys(ftsSpec, obj, keys); ftsSepc:是用来描述语言类型的类,定义的语言索引的一系列属性 obj:需要被索引的文档 keys:分词后产生的结果 FTSIndexFormat的实现: getKeys:找多文档所有命中的索引,FTS也支持组合索引,全文索引分词的建立通过函数FTSSpec::scoreDocument,注意:单个文档所有的索引加起来不能大于4MB,数量小于400000个。所以MongoDB的全文索引不实用于较大的文档。尤其是中文情况下,由于中文的复杂性,所以要格外注意,从性能角度上建议把文档控制在几百KB左右范围内。 getIndexKey:获取索引Key,VERSION2的算法对较长的Key进行了Hash压缩,算法是截取前32字节的前缀,后32字节用Murmurhash运算出来的数值代替,所以,最长不会超过64字节。 FTSSpec::scoreDocument,分词,然后统计频度,最后算法评分,算法原则是单词出现比例越大分数约高,单词的基数越大,造成的分数差距越小,算法大概如下: exp = 2^(count-1) freq = SIGMA(1/exp) coeff = (count + totalCount) / ( 2 * totalCount) score = weight * freq * coeff 分词实现 分词是搜索的基础,MongoDB目前只支持拉丁语系的分词处理,拉丁语的分词相对与中文简单很多,只需要有限的stop-word(比如the,a,空格等)即可完成一个套简单的分词功能。而中文博大精深,目前还没有非常好的开源库实现中文分词。 具体算法,先用stop-word进行文本切割,然后调用libstemmer三方库进行词干提取,最后得到文本分词。 挑战中文分词 经过一天的代码学习,总结下来,要实现中文分词首先要解决的是中文分词器,然后是字库。 有了两者的基础后,可以通过定义VERSION版本来自定一套分词算法,甚至是评分标准。
越来越多的应用采用MongoDB作为数据存储层,性能高,扩展性强,通过WriteCocern参数还可以控制写入持久级别,CAP上灵活配置。文档型的存储结构又是特别适合物联网,游戏等领域,这些数据也蕴藏这巨大的价值,就像是金矿一样,需要挖掘。虽然MongoDB提供了MapReduce功能,但功能相对薄弱,如果说MongoDB MapReduce是铁锹,Spark就是一台真正的挖掘机。 阿里云云数据库已经推出了MongoDB云服务,EMR(E-MapReduce)也是公测期,EMR提供了便捷的Spark服务,本篇文章将给大家介绍下如何使用使用阿里云服务,构建基于MongoDB的大数据计算平台。 EMR服务申请和创建 准备工作 钱,服务是要买的,学习为目的可以使用小时付费 提前开通OSS,EMR服务是依赖OSS的,所以建议提前开通OSS 申请EMR公测资格 点击申请地址,开通一般是在1-2个工作日左右,目前公测期间EMR服务的价格与ECS保持一致。长期使用可以按月购买,最小规模大概1000元左右,学习的话可以按小时付费,不过用好后请记得释放。 创建EMR集群 申请通过后就可以创建集群了,注意下运行日志的路径,需要指定一个OSS Bucket存放日志,为了方便追踪状态,建议开启。 输入好密码后就可以点击下一步了进行软件配置,默认选择Hadoop集群即可,继续下一步。因为EMR实际上是运行在ECS上,所以需要安全组配置,没有的话需要创建一个。另外,测试目的的话需要最小化集群配置,Core减小到一个节点,生产目的的话强烈建议多个Core。 继续,支付订单,等待集群创建,大概30秒后集群即可创建完毕。在ECS控制台上也可以看到新生产出的两个ECS节点,上面就运行着EMR服务,我们可以像使用普通ECS的方式一样登陆到节点上。 OK,至此Spark集群已经构建完成。 购买阿里云云数据库MongoDB 因为MongoDB已经是商业化的服务,所以正常购买即可,但需要注意的是,一定要购买与EMR服务在同一个可用区的实例,否则网络是不通的。 EMR可用区查看 MongoDB可用区选择 等待30S后查看控制台,MongoDB实例创建成功。 创建好后,先写上几条数据,为后面的DEMO做准备,如图: 检查网络连通性 开始之前还需要检查下EMR与MongoDB云服务的网络连通性,看看是否是畅通的。登陆到EMR创建好的ECS上,通过telnet命令来探测: telnet dds-xxxxxxx.mongodb.rds.aliyuncs.com 3717 如果发现无法连接有几个可能性逐一排查: EMR服务与MongoDB云服务不在同一个可用区,阿里云的网络规则下是不通的,需要重新购买 安全组限制了内网进出口,可以登陆ECS控制台修改安全组规则,让其可以访问MongoDB服务端口 由于欠费等原因,生长出来的实例被回收了,也可以通过控制台查看实例状态是否正常 至此,资源都已经Ready,接下来我们一起构建Spark 计算用的Jar包吧。 Spark任务编写 Jar包依赖 要想Spark访问MongoDB,必须找到相对应的Hadoop Connector和相关的Jar包,可以参考如下Maven POM配置。具体的版本,根据自己的实际需要去更新。 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aliyun.mongodb</groupId> <artifactId>spark-test</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>fully.qualified.MainClass</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.mongodb.mongo-hadoop</groupId> <artifactId>mongo-hadoop-core</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> </dependency> </dependencies> </project> Job编写 通过MongoDB控制台准备好MongoDB的几个属性: 两个访问地址,注意,是两个 用户名,密码,从MongoDB上拉取需要读权限,如果还希望数据写回MongoDB,那写权限也需要准备好 MongoDB集群名,以mgset开头 private static String DEFAULT_AUTH_DB = "admin"; private static String seed1 = "dds-xxxxx1.mongodb.rds.aliyuncs.com:3717"; private static String seed2 = "dds-xxxxx2.mongodb.rds.aliyuncs.com:3717"; private static String username = "root"; private static String password = "123456"; private static String replSetName = "mgset-1234567"; 接下来构建MongoDB ConnectionURI,具体的规则参考如下代码,参考github文档,或者跟着下面的代码抄写。最终要有三个URI: mongoURI 用来鉴权 inputURI 数据输入地址 ouputURI 数据输出地址 private static String authURIPrefix = "mongodb://" + username + ":" + password + "@" + seed1 + "," + seed2 + "/"; private static String authURISuffix = "?replicaSet=" + replSetName; private static String inputColl = "testdb.input"; private static String outputColl = "testdb.output"; private static String mongoURI = authURIPrefix + DEFAULT_AUTH_DB + authURISuffix; private static String inputURI = authURIPrefix + inputColl + authURISuffix; private static String outputURI = authURIPrefix + outputColl + authURISuffix; 至此,访问环境相关的变量都已经初始化完成,正式进入到Job内容,这里的Demo很简单,不能免俗的Hello World风格,但麻雀虽小五脏俱全,从配置到输入到计算再到输出,完整的一套流程。 首先撞见SparkContext,Spark作业的生命周期都会伴随着这个Context,并且配置Configuration对象,Configuration对象维护着上面提到的访问地址参数,更详细参数说明可以参考github。 JavaSparkContext sc = new JavaSparkContext(new SparkConf()); Configuration config = new Configuration(); config.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); config.set("mongo.job.output.format", "com.mongodb.hadoop.MongoOutputFormat"); config.set("mongo.auth.uri", mongoURI); config.set("mongo.input.uri", inputURI); config.set("mongo.output.uri", outputURI); 接下来轮到获取数据RDD了,RDD是Spark中的数据表达形式。这里要注意RDD Value类型,是BSONObject,BSON是MongoDB文档数据的表现形式。通过这样一条语句做了BSON到RDD的映射。 JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD( config, // Configuration MongoInputFormat.class, // InputFormat: read from a live cluster. Object.class, // Key class BSONObject.class // Value class ); 有了数据,就可以开始计算了,简单的做个mapValues动作,可以注意看,返回的仍然是个RDD,不过这个RDD是经过map动作处理后的。 JavaPairRDD<Object, BSONObject> updates = documents.mapValues(new MongoDBMapFunction()); mapFunction很简单,替换所有的name值为Spark,当然也可以做些统计的DEMO,后面的文章会再介绍更复杂的DEMO,敬请关注。 public class MongoDBMapFunction implements Function<BSONObject, BSONObject> { public BSONObject call(BSONObject bsonObject) throws Exception { bsonObject.put("name", "spark"); return bsonObject; } } 最后一步,数据的输出,MongoDB即是输入源又是输出源,所以第一个hdfs路径参数实际是无效的,但不可以是null,后面的类型描述了RDD的key,value类型,要跟updates一致,最后的config内容已经在程序最开始设置过了。 updates.saveAsNewAPIHadoopFile( "file://this-is-completely-unused", Object.class, BSONObject.class, MongoOutputFormat.class, config ); 额外说说明一下,Spark在动作是lazy的,整个代码流程下来,只有当程序执行到saveAsNewAPIHadoopFile时,才会触发数据拉取和计算等动作。 最后一步,构建Jar包,使用assembly的方式去构建,避免ClassNotFound的尴尬: mvn assembly:assembly 上传JAR包并执行 剩下的操作都不需要写代码了,只需要操作控制台即可。几个步骤:上传JAR包->创建作业->创建执行计划->执行,我们来实际操作下。 再次登陆到OSS控制台,把刚才Jar包上传到OSS上,后面会用到。再回到EMR控制台上的作业栏里创建一个作业,需要指定一些参数,只名Job Class,然后点击下面的按钮添加OSS路径,内容是就是刚才上传的Jar包地址。值得注意的是,这里用的是ossref前缀,遇到这样的前缀EMR服务会自动的从OSS拉取Jar下来,否则原生的Spark是不识别的。最后应用参数应该是如下样子: --master yarn-client --class com.aliyun.apsaradb.mongodb.Main ossref://sparkbucket/jar/spark-test-1.0-SNAPSHOT-jar-with-dependencies.jar 接下来是创建执行计划了,根据提示,在执行计划栏里进行创建,会提示采用的集群,作业集合,调度方式,这个DEMO采用的手动方式调度。 最后激动的时刻来临了,在执行计划栏里点击立即执行,运行过程和结束后都可以通过浏览器在网页上查看运行日志,非常方便。等待几十秒后,任务成功。 我们在回到DMS上查看数据集合,会发现已经多出了ouput集合,并且内容都为 { "name": "spark"} 至此,Spark与MongoDB的Hello World风格教程结束,各位可以发挥无限的想象力,玩的开心! 参考连接: https://docs.mongodb.com/ecosystem/tools/hadoop/ https://databricks.com/blog/2015/03/20/using-mongodb-with-spark.html
前言 Driver是MongoDB非常重要的组成部分,通过不同的配置实现Secondary访问;读写分离,动态感知集群容灾切换等功能。MongoDB目前已经覆盖了大部分的开发语言,常见的JAVA到Go,可以参考官方连接MongoDB Drivers。这篇文章我们以Java版本为例介绍MongoDB的Drivers实现逻辑和协议,选择Java的原因为语言阅读门槛低,配合IDE,进行DEBUG追踪也非常容易,其次Java版本实现的功能非常完善。主要讲解的是ReplicaSet模式,因为Shard模式有mongos作为代理,没有充分体现Driver的路由逻辑。 本篇讲解了以下三个部分: Driver初始化 Driver请求路由 Driver容灾处理 Driver初始化 配置启动参数DEMO 首先来看下Java初始话配置Demo: public static MongoClient createMongoDBClient() { // 构建Seed列表 List<ServerAddress> seedList = new ArrayList<ServerAddress>(); seedList.add(seed1); seedList.add(seed2); // 构建鉴权信息 List<MongoCredential> credentials = new ArrayList<MongoCredential>(); credentials.add(MongoCredential.createScramSha1Credential(username, DEFAULT_DB, password.toCharArray())); // 构建操作选项,requiredReplicaSetName属性外的选项根据自己的实际需求配置,默认参数满足大多数场景 MongoClientOptions options = MongoClientOptions.builder() .requiredReplicaSetName(ReplSetName).socketTimeout(2000) .connectionsPerHost(1).build(); return new MongoClient(seedList, credentials, options); } 还有另外一种通过URI初始化的方法: public static MongoClient createMongoDBClientWithURI() { //另一种通过URI初始化 //mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] MongoClientURI connectionString = new MongoClientURI("mongodb://" + username + ":" + password + "@" + seed1 + "," + seed2 + "/" + DEFAULT_DB + "?replicaSet=" + ReplSetName); return new MongoClient(connectionString); } 两种方式的实现原理么有本质区别,Driver会将URI拆解为第一种的参数设置方式。 URI方式的有点是在复杂的系统中更通用,不同编程语言系统可以共享一套初始化配置;如果有多套异构的系统访问MongoDB,建议采用URI的方式。 Builder方式更灵活,对参数的设置非常细,并且代码清晰易读。 设置Seed地址 仔细的读者会注意到,Address地址变量叫做Seed,而不是MongoDB集群上的Primary,Secondary,或者其他名称。Seed是MongoDB访问地址的一种抽象概念,可以是MongoDB集群中的任意一个节点,如果是ShardCluster模式,则应该是Mongos地址。这样的好处是使用者不需要关系集群的状态改变,更不用随着集群状态改变而修改配置。比如,增加了一个新的节点,再或者发生了Primary与Secondary之间的角色切换,但Seed却保持不变,只要是集群中的任意地址即可。然而Driver在处理请求时是需要明确Primary,Secondary等角色的地址关系。 构造ServerMonitor线程 所谓的初始化,就是对集群状态的第一次获取,并且构建Driver内部使用的ServerMonitor线程,集群中的每个节点,都对应有一个ServerMonitor。初始状态下会以Seed为起点开始,ServerMonitor主要有几下个动作: 首先检车连接状态,如果是不可用的,需要重新建立链接,并执行Action A动作: Action A:ConnectionOpen,每个连接的建立好都会执行以下的请求 ismaster 这里的作用相当于Ping一下,在这个Action中没有特别的作用 buildinfo 获取Server的版本信息,目的是做版本之间的兼容性 auth 如果配置有鉴权属性,会执行此步骤,目前默认的是SASL,可以参考RFC,SCRAM方式一般需要三次的RPC交互。 getlasterror 确定上面的请求没有出错,整个Action结束 连接Ready后,开始心跳,并且间隔性的检查,默认每10秒钟,可配置,但最小不能低于500毫秒 Action B:LookupServerDescription ismaster 与Action A的命令一样,但这里获取到的信息非常重要 检查上一步刚刚获得的Server Description与之前的是否一致,如果没有发生任何改变,状态稳定,则该轮心跳检查结束。不一致,则产生changeEven事件,调用Acton C。 如果有必要,则继续执行OnChanged Listener: Action C:Server Description OnChanged 更新Server Hosts列表,如果列表中之间不存在该地址,则新创建连接,以新地址构造ServerMonitor线程 如果当前节点是Primary,还要更新ElectionId等字段 在Action C动作结束后,每个Server都产生了一个Server Montior线程,并且Driver也同步到了集群状态,Primary地址,ElectionId等信息。一旦集群信息发生改变,心跳线程也会随时发现并做出相应的修改。 连接池ConnectionPool ServerMonitor是独立的线程,只保证心跳检查,不处理真正的用户请求。用户的请求是通过其他的连接完成,那对连接的管理是有个套ConnectionPool机制,每个连接有一套独立的ConnectionPool。初始化后ConnectionPool是空的,等请求需要时会在建立连接。 请求线程从Pool中拿到Connection后,会在请求线程里处理请求,结束后再放回给ConnectionPool。不同的实现语言,这里处理的逻辑并不一样,不属于MongoDB约定的协议范围内。 isMaster命令: 可以参考官方文档,isMaster,在MongoShell中执行会得到以下结果,非常重要的是ismaster和hosts字段,这两个字段完整的描述了集群的状态信息。Action B.2的判断,依赖ismaster的是否是true. { "setName" : "rs0", "setVersion" : 3, "ismaster" : true, "secondary" : false, "hosts" : [ "10.1.2.123:12300", "10.1.2.124:12300" ], "primary" : "10.1.2.123:12300", "me" : "10.1.2.123:12300", "electionId" : ObjectId("569dfe3aba498a31641f3630"), "maxBsonObjectSize" : 16777216, "maxMessageSizeBytes" : 48000000, "maxWriteBatchSize" : 1000, "localTime" : ISODate("2016-01-20T04:54:20.768Z"), "maxWireVersion" : 3, "minWireVersion" : 0, "ok" : 1 } 从整个流程上可以看到,ismaster命令存在冗余,一个连接的初始化逻辑,至少需要发送2次,并且对同一个Server,不同的连接还是要发送,很多情况不是必要的,可以简化,提高连接建立的速度。 MongoDB Driver初始化总结 通过对MongoDB Driver For Java的分析,我们已经很清楚了初始化的行为动作,包括平时运行时的大概情况,其他语言的实现也大同小异,跑不出这个流程。希望对读者日后处理客户端初始化问题时有所帮助。 了解原理后我们回头来思考下,怎么使用才是最佳实践,主要几点: 从整个流程上看,MongoDB Driver初始化的过程很‘重’,要交互很多次,尤其是使用PHP的同学注意了,不建议采用短连接的方式,请将MongoDB的连接持久化下来。 每个Driver都至少会有一个Monitor连接,而且是不会回收的。所以,规划连接数时,需要关注到这点。 提高请求线程并发量的同时,尝试同步提高连接池上限,对性能会有一定帮助。但请注意,不是越多越好,过多的线程会导致线程调度消耗过多的资源。而且请配置连接的IDLE time,让其自动回收长期不用的连接,避免连接泄漏。 最重要的一点,学会用isMaster命令排查问题,Driver请求无法访问时,尝试在Mongo Shell中执行isMaster,确保返回的集群信息都是正确的。并且isMaster是不需要鉴权的,所以,保护好你的MongoDB实例,不要出现的公网上。 配置多个Seed地址是,避免其中一个不可用。 ...... 待续,Driver请求路由
根据DB-Engines的排名统计,MongoDB综合排名第四(2016年1月数据,前三名分别是Oracle,MySQL,SQL Server),NoSQL领域(非RDBMS)里排名第一。尤其是在2015年里,一口气发布了两个大版本,3月发布了3.0版本,11月发布3.2版本。 在3.2版本中开始突破NoSQL的枷锁,提供了Join操作,可见其并不满足于NoSQL“独角兽”的称呼,目标早已定位成NewSQL,向RDBMS发起挑战。 但MongoDB毕竟不是RDBMS,甚至在数据建模上有很多相反的原则,比如MongoDB更提倡反范式化。而人的惯性思维作用,是更原因相信“传统”,比如MongoDB:逐渐变得无关紧要这篇文章,作者并不习惯于使用MongoDB的文档模型,并不肯定文档模型带来的开发效率的提升,而更依赖传统数据库结构带来的沉淀,我怀疑作者对MongoDB的数据建模上还是采用着范式化结构,所以才如此依赖多文档事务,并且对MongoDB带来强烈的不满。也没有关注到最新的MongoDB已经开始采用WiredTiger存储引擎,正式步入高性能数据库行列。 还有那篇著名的讨论“Don’t use MongoDB”,这篇文章发表于11年,原文很多内容并不属实(作者承认那篇文章只是个玩笑),而且10gen(MongoDB公司之前的名称是10gen) CTO对此还进行了回复,可是直到今天,很多人还是只知前者,甚至深信不疑,在社区中反复抛出同样的观点,比如:“MongoDB会丢数据”。 造成各种个样的误解,最大的原因还是“不了解”或者说MongoDB大过于神秘,不像MySQL,Oracle已经有大量的文章,大量的解惑教程,MySQL甚至还有各种的源码分析。MongoDB社区生态上还非常年轻,相关的技术文章或者书籍还不丰富,回顾我自己学习MongoDB的过程,一方面来自官方文档,另一方面来自各种博客书籍。但内容上大部分只是叫你HOW,而不是WHY,不能知其所以然。随着云服务的起步,运维模式的变化,出现了越来越多的DEV+DBA角色,仅仅会用API,也早已跟不上时代,更多的还要做到会管。比如什么时候要建立索引?查询语句是怎么执行的?哪里会有潜在的问题?这些都是作为数据操作者应该掌握的知识。但并不是每个人都有时间从源码上一点点学习。 就在前几天,阿里云作为中国第一的云计算公司,正式开始提供MongoDB SAAS服务。作为MongoDB生态圈中的一员,有义务做出一个更深入的教程,MongoDB用户可以通过文章提升自身的操作管理水平,开源爱好者也可以将该列文章作为源码阅读的辅助。 系列第一部分重点在“用”好MongoDB,第二部分重点是“管”好MongoDB,第三部分是案例的收集。
AliCloudDB MongoDB在开发过程中遇到一个无法正常退出的BUG,由于是Release版本的编译(-O3),debuginfo已经不能很好的展现出堆栈的情况。这时又该如何确定问题所在呢?本篇文章完整的记录了整个排查过程。 场景 kill命令正常执行,但进程迟迟没有退出。非必现场景,在批量回收资源时比较容易出现,平时开发测试时没有遇到。从场景上看出现的概率并不高,可能是在某种极端条件下才能触发,由于第一次收到报告后没有保留现场,先到官方JIRA平台上去搜相关的BUG,无果,又盲目的尝试了几个场景后只能先Hold住,等待下次出现。 排查 确认BUG方向 很幸运,第二天BUG再次出现,僵尸进程?死循环?死锁?没有收到Kill信号?无数想法蹦出来,迅速登陆机器,查看现场,先从最简单的可能性开始开始排查。 ps`top`第一套组合拳,排除了僵尸进程可能性,并且TOP显示CPU使用率为0并不高;strace继续跟进查看,也没有发现有系统调用,最后在补一个pstack打印堆栈信息,全部线程都在wait。BUG的排查方向大致确定:线程间资源同步的问题(当然也不能排除是其他的可能性)。 确认代码范围 详细分析pstack内容,从堆栈信息中看一个长相特别的(其他大部分的线程堆栈都是雷同的): Thread 46 (Thread 0x2b5f079be700 (LWP 613)): #0 0x000000301800b43c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0 #1 0x0000000000fec763 in boost::thread::join() () #2 0x0000000000c8eebc in mongo::repl::ReplicationCoordinatorExternalStateImpl::shutdown() () #3 0x0000000000c93fed in mongo::repl::ReplicationCoordinatorImpl::shutdown() () #4 0x0000000000ad2463 in mongo::exitCleanly(mongo::ExitCode) () #5 0x0000000000f9c176 in ?? () #6 0x0000000000feb384 in ?? () #7 0x0000003018007851 in start_thread () from /lib64/libpthread.so.0 #8 0x000000300a4e767d in clone () from /lib64/libc.so.6 从函数名上看起来是MongoDB退出的关键路径,就从这里入手,人肉翻下源码: 97 void ReplicationCoordinatorExternalStateImpl::shutdown() { 98 boost::lock_guard<boost::mutex> lk(_threadMutex); 99 if (_startedThreads) { 100 log() << "Stopping replication applier threads"; 101 _syncSourceFeedback.shutdown(); 102 _syncSourceFeedbackThread->join(); 103 _applierThread->join(); 104 BackgroundSync* bgsync = BackgroundSync::get(); 105 bgsync->shutdown(); 106 _producerThread->join(); 107 } 108 } 这么多的Join,到底是哪个呢。上GDB,我们来看看Thread 46到底在等谁。先加载symbol-file,失败,加载后堆栈变得更乱了,换disassemble命令,显示汇编信息: 0x0000000000c8ee6b <+75>: mov %rsp,%rdi 0x0000000000c8ee6e <+78>: callq 0xdc8670 <_ZN5mongo6logger16LogstreamBuilder10makeStreamEv> 0x0000000000c8ee73 <+83>: mov 0x20(%rsp),%rdi 0x0000000000c8ee78 <+88>: lea 0x86e299(%rip),%rsi # 0x14fd118 0x0000000000c8ee7f <+95>: mov $0x24,%edx 0x0000000000c8ee84 <+100>: callq 0x1456550 <_ZSt16__ostream_insertIcSt11char_traitsIcEERSt13basic_ostreamIT_T0_ES6_PKS3_l> 0x0000000000c8ee89 <+105>: mov %rsp,%rdi 0x0000000000c8ee8c <+108>: callq 0xdc88d0 <_ZN5mongo6logger16LogstreamBuilderD2Ev> 0x0000000000c8ee91 <+113>: lea 0x38(%rbp),%rdi 0x0000000000c8ee95 <+117>: callq 0xcce810 <_ZN5mongo4repl18SyncSourceFeedback8shutdownEv> 0x0000000000c8ee9a <+122>: mov 0xe8(%rbp),%rdi 0x0000000000c8eea1 <+129>: test %rdi,%rdi 0x0000000000c8eea4 <+132>: je 0xc8ef14 <_ZN5mongo4repl39ReplicationCoordinatorExternalStateImpl8shutdownEv+244> 0x0000000000c8eea6 <+134>: callq 0xfec600 <_ZN5boost6thread4joinEv> 0x0000000000c8eeab <+139>: mov 0xf0(%rbp),%rdi 0x0000000000c8eeb2 <+146>: test %rdi,%rdi 0x0000000000c8eeb5 <+149>: je 0xc8ef14 <_ZN5mongo4repl39ReplicationCoordinatorExternalStateImpl8shutdownEv+244> 0x0000000000c8eeb7 <+151>: callq 0xfec600 <_ZN5boost6thread4joinEv> => 0x0000000000c8eebc <+156>: callq 0xc284a0 <_ZN5mongo4repl14BackgroundSync3getEv> 0x0000000000c8eec1 <+161>: mov %rax,%rdi 0x0000000000c8eec4 <+164>: callq 0xc27f60 <_ZN5mongo4repl14BackgroundSync8shutdownEv> 0x0000000000c8eec9 <+169>: mov 0xf8(%rbp),%rdi 0x0000000000c8eed0 <+176>: test %rdi,%rdi 0x0000000000c8eed3 <+179>: je 0xc8ef14 <_ZN5mongo4repl39ReplicationCoordinatorExternalStateImpl8shutdownEv+244> 0x0000000000c8eed5 <+181>: callq 0xfec600 <_ZN5boost6thread4joinEv> 0x0000000000c8eeda <+186>: nopw 0x0(%rax,%rax,1) 0x0000000000c8eee0 <+192>: mov %rbx,%rdi 0x0000000000c8eee3 <+195>: callq 0x804a38 <pthread_mutex_unlock@plt> 0x0000000000c8eee8 <+200>: cmp $0x4,%eax 0x0000000000c8eeeb <+203>: je 0xc8eee0 <_ZN5mongo4repl39ReplicationCoordinatorExternalStateImpl8shutdownEv+192> 0x0000000000c8eeed <+205>: test %eax,%eax 0x0000000000c8eeef <+207>: jne 0xc8ef0f <_ZN5mongo4repl39ReplicationCoordinatorExternalStateImpl8shutdownEv+239> 0x0000000000c8eef1 <+209>: add $0x38,%rsp 0x0000000000c8eef5 <+213>: pop %rbx 0x0000000000c8eef6 <+214>: pop %rbp 0x0000000000c8eef7 <+215>: pop %r12 0x0000000000c8eef9 <+217>: pop %r13 0x0000000000c8eefb <+219>: retq 查看0x0000000000c8eeb7地址的上下文,通过前后指令的函数符号名确定了目前代码是在_applierThread->join(),这里可以思考下是否还有的其他方法获得代码行。 _applierThread同样也是个线程,如果shutdown在等它,那它又在等谁呢,回头继续查pstack输出,找到相关的堆栈: Thread 34 (Thread 0x2b5f0a6d8700 (LWP 1355)): #0 0x000000301800b43c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0 #1 0x0000000000c2687b in mongo::repl::BackgroundSync::waitUntilPaused() () #2 0x0000000000cd192e in mongo::repl::SyncTail::tryPopAndWaitForMore(mongo::OperationContext*, mongo::repl::SyncTail::OpQueue*, mongo::repl::ReplicationCoordinator*) () #3 0x0000000000cd2823 in mongo::repl::SyncTail::oplogApplication() () #4 0x0000000000ccaaaf in mongo::repl::runSyncThread() () #5 0x0000000000feb384 in ?? () #6 0x0000003018007851 in start_thread () from /lib64/libpthread.so.0 #7 0x000000300a4e767d in clone () from /lib64/libc.so.6 注意这里与shutdown的等待是不同的,shutdown是在等待线程退出,而这里是在等待某个条件变量,再次上GDB,查看Thread 34 & backtrace 1, info locals: _lastOpTimeFetched = {i = 4, secs = 1448986976}, _lastAppliedHash = 3485900536827542548, _lastFetchedHash = 3485900536827542548, _pause = false, _pausedCondition = {internal_mutex = { __data = {__lock = 0, __count = 0, __owner = 0, __nusers = 1, __kind = 0, __spins = 0, __list = {__prev = 0x0, __next = 0x0} }, __size = '\000' <repeats 12 times>, "\001", '\000' <repeats 26 times>, __align = 0 } 找到代码BUG 看看代码怎么写的吧: 469 void BackgroundSync::waitUntilPaused() { 470 boost::unique_lock<boost::mutex> lock(_mutex); 471 while (!_pause) { 472 _pausedCondition.wait(lock); 473 } 474 } _pause变量一直都是0,所以会Hang在这里。继续查看代码,跟踪_pausedCondition的调用,有两个函数会去唤醒,一个是stop,另一个是shutdown,但shutdown的调用应该在线程退后调用,以便释放资源。 同时,再次回过来在pstack中查看BackgroundSync的堆栈信息,想看看到底卡在了哪里。结果找不到BackgroundSync线程,问题更清晰了,所有_pausedCondition条件变量的唤醒,都是在该线程中完成的,如果BackgroundSync已经不存在了,一定会hang住。 再反复阅读代码,BackgroundSync在判断到inShutdown()时会自动结束生命周期,但结束后并没有更改_pause状态。 FIX BUG 解决办法是线程最后退出前执行stop函数,修改_pause状态,(shutdown会提前释放资源),查看官方最最新代码,确认有同样的修改,反向追踪提交,找到相关JIRA,抱怨JIRA的搜索弱爆了。 复现和验证 为何该BUG出现的频率会非常低呢,主要在判断是否等待的条件上: if (replCoord->isWaitingForApplierToDrain()) { 506 BackgroundSync::get()->waitUntilPaused(); 507 if (peek(&op)) { 508 // The producer generated a last batch of ops before pausing so return 509 // false so that we'll come back and apply them before signaling the drain 510 // is complete. 511 return false; 512 } 513 replCoord->signalDrainComplete(txn); 514 } 1908 case kActionWinElection: { 1909 boost::unique_lock<boost::mutex> lk(_mutex); 1910 _electionId = OID::gen(); 1911 _topCoord->processWinElection(_electionId, getNextGlobalOptime()); 1912 _isWaitingForDrainToComplete = true; 1913 const PostMemberStateUpdateAction nextAction = 1914 _updateMemberStateFromTopologyCoordinator_inlock(); 1915 invariant(nextAction != kActionWinElection); 1916 lk.unlock(); 1917 _performPostMemberStateUpdateAction(nextAction); 1918 break; 1919 } 也就是说刚刚赢得了选举后会产生_isWaitingForDrainToComplete == true状态,恰巧这个时间窗口内进程接受到退出信号。复现的办法就是在kActionWinElection 的后面加上sleep,以此来延长时间窗口,当然也可以通过GDB BLOCK的方式来复现。
前言 受到中文社区《电商参考架构第二部分:库存优化方法》启发,想到了去年做过类似的电影票预定系统,如果用MongoDB去做存储支撑,那应该是怎样架构的呢?本文的目的是为了更好的学习掌握MongoDB,所以某些设计上更偏向于功能的展示,在实际使用上要因地制宜的改变,合适才是最好的。 需求 电影票预定系统与电商系统非常类似,都可以抽象理解为商品的售卖。进一步的讲电影票系统是电商系统的一个库存特例场景: 每个场次,每个座位,都只有一个库存 每个订单所预定的座位有锁定状态,在支付前对应的作为不能被再次购买 订单涉及到的座位要不全成功,要不全失败 “全国”级的,数据容量不是太大问题,但性能上要支持水平扩展 PS:实际上的理论TPS并不高,目前全国5000家影院,假设平均8个影厅,每个厅200个位置,每个影厅6个场次,早中晚各3个高峰,每个高峰1个小时。计算得出TPS大概是:5000 8 6 * 200/ 3 / 3600 = 4400 TPS;但是设计上我们还是要保证性能的可水平扩展,否则怎么体现MongoDB的特色呢?^-^ 描述信息文档结构 影院描述信息 保存最基本的影院信息,包括地理信息,名称,_id为MongoDB由MongoDB自动分配 CinemaManager.cinema_detail { _id: <ObjectID>, name: "<cinema name>", city: "<city name>" location: [<longitude>, <latitude>], comments: "<detail message>" } 例如: rs0:PRIMARY> db.cinema_detail.insert({ "name" : "大时代电影院", "city" : "杭州", "location" : [ 120.13, 30.16 ], "comments" : "IMAX 4K,有停车位" }); 因为影院信息的查询一般都是按照城市和名称,或者地理坐标检索,所以这里建立两个索引 Index1:城市+名称的复合索引,因为查询电影院时一般都会指定城市名 rs0:PRIMARY> db.cinema_detail.ensureIndex({city:1, name:1}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 2, "numIndexesAfter" : 2, "ok" : 1 } 注意,这里使用的是复合索引,所以针对 city + name的查询,或者city的查询是有效的,只查找name字段是无法通过索引优化的。 Index2:地理坐标索引,用来应付"最近的电影院"类查询 rs0:PRIMARY> db.cinema_detail.ensureIndex({location: "2d"}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 3, "numIndexesAfter" : 4, "ok" : 1 } 例如,查询在杭州最近的某个电影院 rs0:PRIMARY> db.cinema_detail.find({city:"杭州", location: { $near: [1.0, 2.0] }}).pretty() { "_id" : ObjectId("559a3ef8c6058dae1ac49ce8"), "name" : "大时代电影院", "city" : "杭州", "location" : [ 120.13, 30.16 ], "comments" : "IMAX 4K,有停车位" } 影厅描述信息 theater_detail.cinema_id与cinema_detail._id集合形成references关系,通过cinema_detail._id可以快速找到所属影厅的信息。另一个关键字段theater_detail.seat用来描述座位信息,每排所有的座位是一个数组,不同排可以有不同数量的座位。 CinemaManager.theater_detail { _id: <ObjectID>, cinema_id: <ObjectID(cinema_detail._id)>, name: <theater name>, seat: { row1: [<seat valid>], row2: [<seat valid>], row3: [<seat valid>], <seat row>: [<seat valid>] } comments: "<detail message>" } rs0:PRIMARY> db.theater_detail.insert({ cinema_id:ObjectId("559a3ef8c6058dae1ac49ce8"), name:"IMAX厅", seat: { row1: [1, 1, 1, 1], row2: [1, 1, 1], row3: [1, 1, 1, 1], row4: [1, 1, 1, 1, 1], }, comments: "可容纳哦xxx人,弧形荧幕" }) rs0:PRIMARY> db.theater_detail.insert({ cinema_id:ObjectId("559a3ef8c6058dae1ac49ce8"), name:"中国巨幕厅", seat: { row1: [1, 1, 1, 1], row2: [1, 1, 1], row3: [1, 1, 1, 1] }, comments: "可容纳哦xxx人,弧形荧幕" }) 建立索引 rs0:PRIMARY> db.theater_detail.ensureIndex({cinema_id:1}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "ok" : 1 } 影片描述信息 影片说明 { _id: <ObjectID>, name: "<movie name>", director: "director name" actor: [<actor name>] comments: "<detail message>" } rs0:PRIMARY> db.movie_detail.insert({ name: "一路向西", director: "胡耀辉", actor:["张建声", "王宗尧", "胡耀辉", "何佩瑜", "张暖雅", "郭颖儿"], comments: "该影片描写的是当代香港社会中普通年轻人对“爱”与“性”的追求而逐渐改变的心路历程的故事" }) 索引 rs0:PRIMARY> db.movie_detail.ensureIndex({name:1}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "ok" : 1 } 影片放映文档结构 放映信息包含放映时间段,放映影厅,票价。虽然Document结构可以做复杂的嵌套,但原则上期望Document尽量小,利用数据Shard,性能优化。所以在movie_schedule的设计上每个影片的每场放映独立一个Document表达。 { _id: <ObjectID>, cinema_id: <ObjectID(cinema_detail._id)> movie_id: <ObjectID(movie_detail._id)>, theater_id: <ObjectID(theater_detail._id)>, start_time: <ISODate>, end_time: <ISODate>, comments: "<detail message>" } movie_schedule的References关系较多,需要与电影院,影厅,电影三者分别建立关系。 db.movie_schedule.insert({ cinema_id:ObjectId("559a3ef8c6058dae1ac49ce8"), movie_id:ObjectId("559b68f372b34f216246cb1d"), theater_id:ObjectId("559b625072b34f216246cb1b"), start_time: ISODate("2015-07-07T10:00:00.00Z"), end_time: ISODate("2015-07-07T12:00:00.000Z"), comments: "首映" )} db.movie_schedule.insert({ cinema_id:ObjectId("559a3ef8c6058dae1ac49ce8"), movie_id:ObjectId("559b68f372b34f216246cb1d"), theater_id:ObjectId("559b625072b34f216246cb1b"), start_time: ISODate("2015-07-07T12:30:00.00Z"), end_time: ISODate("2015-07-07T14:30:00.000Z"), comments: "" )} 还是建立一个复合索引,优化查询某一电影院的某部影片(的某一影厅)上映信息 rs0:PRIMARY> db.movie_schedule.ensureIndex({cinema_id:1, movie_id:1, theater_id:1}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "ok" : 1 } PS:也可以建立相应的索引,用来优化某一时间段内的影片信息查询,读者自行思考 交易系统 至此,基本的信息文档集合均已建立完成,一般的查询需求都可以满足了。接下来是重点:库存售卖系统。抽象的来看,售卖系统就是对上诉所有集合的一个整合,外加一套库存字段。我们认为一场放映就是一个主商品,每个座位可以认为是这个商品的SKU,每个SKU都是1份。 通过Reference关系结合movie_schedule与theater_detail,注意这里引用了 { _id: <ObjectID>, movie_schedule_id: <ObjectID(movie_schedule._id)> theater_id: <ObjectID(theater_detail._id)>, seat: { row1: [2, 2, 2, 2], row2: [2, 2, 2], row3: [2, 2, 2, 2], row4: [2, 2, 2, 2, 2], } } 注意,这里不仅仅是Reference的引用关系,还复制了theater_detail.seat字段,每个seat都有一个库存数字,因为在MongoDB中一个Document的操作是可以保证原子的,不需要对Collection加任何锁。数字2并不是表示可以卖2次: 数字2表示,可销售 数字1表示,已锁定 数字0表示,已售完 交易逻辑上可通过FindAndModify + $inc,原子性的修改库存信息。其他的描述信息是否需要再次冗余取决于具体的业务状况了,具体问题具体分析。我本人更倾向于目前的数据结构方案,不做过多的冗余,原因: 数据订正复杂,多一个冗余,多一份复杂 其他信息基本都是静态数据,数据量又小,完全可以通过Cache技术解决读取问题 先插入一个我们的商品 db.movie_item.insert({ movie_schedule_id : ObjectId("559b6ee472b34f216246cb1e"), theater_id : ObjectId("559b625072b34f216246cb1b"), seat : { row1: [2, 2, 2, 2], row2: [2, 2, 2], row3: [2, 2, 2, 2], row4: [2, 2, 2, 2, 2], } }) 索引 rs0:PRIMARY> db.movie_item.ensureIndex({movie_schedule_id:1}) { "createdCollectionAutomatically" : false, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "ok" : 1 } 锁定座位的动作,锁定第4排的3号位置(从1开始计数)和锁定第4排的2号位置: db.movie_item.findAndModify({ query: { "_id":ObjectId("559b790f72b34f216246cb22"), "seat.row4.2":2 }, update: { $inc: {"seat.row4.2":-1}}, upsert: false }) db.movie_item.findAndModify({ query: { "_id":ObjectId("559b790f72b34f216246cb22"), "seat.row4.1":2 }, update: { $inc: {"seat.row4.1":-1}}, upsert: false }) 分别锁定了第4排3号(row4[2]),第4排2号(row4[1]),注意,这里是分两次锁定的,锁定操作并不需要原子完成,否则会造成用户锁定失败概率的上升。 rs0:PRIMARY> db.movie_item.find({_id:ObjectId("559b790f72b34f216246cb22")}).pretty() { "_id" : ObjectId("559b790f72b34f216246cb22"), "movie_schedule_id" : ObjectId("559b6ee472b34f216246cb1e"), "theater_id" : ObjectId("559b625072b34f216246cb1b"), "seat" : { "row1" : [ 2, 2, 2, 2 ], "row2" : [ 2, 2, 2 ], "row3" : [ 2, 2, 2, 2 ], "row4" : [ 2, 1, 1, 2, 2 ] } } OK,交易成功以此类推,同时修改两个库存到0,这里利用了findAndModify的原子特性 db.movie_item.findAndModify({ query: { _id:ObjectId("559b790f72b34f216246cb22"), $and:[ {"seat.row4.2":1}, {"seat.row4.1":1}] }, update: { $inc: {"seat.row4.2":-1, "seat.row4.1":-1} }, upsert: false }) 再查下集合看看: rs0:PRIMARY> db.movie_item.find({_id:ObjectId("559b790f72b34f216246cb22")}).pretty() { "_id" : ObjectId("559b790f72b34f216246cb22"), "movie_schedule_id" : ObjectId("559b6ee472b34f216246cb1e"), "theater_id" : ObjectId("559b625072b34f216246cb1b"), "seat" : { "row1" : [ 2, 2, 2, 2 ], "row2" : [ 2, 2, 2 ], "row3" : [ 2, 2, 2, 2 ], "row4" : [ 2, 0, 0, 2, 2 ] } } 总结 一套全国级的电影票系统会比这复杂的多,本文的目的还是以教程为主,主要是说明MongoDB如何构建一个电影票系统,但距离生长系统还是有一定的距离,仍有很多其他的技术点需要讨论,可以延伸开的还有,下单失败,过期未付款,数据唯一性等问题。
Replication是MongoDB一套非常复杂的功能,功能包括数据同步,选举,心跳维护等。涉及到与其他MongoD进程通讯。RPC的封装相对也比较重要,作为这些功能实现的基础。 Replication相关的对象角色都被封装在ReplicationCoordinatorImpl对象中: repl::ReplicationCoordinatorImpl* replCoord = new repl::ReplicationCoordinatorImpl( getGlobalReplSettings(), new repl::ReplicationCoordinatorExternalStateImpl, new repl::NetworkInterfaceImpl, new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), static_cast<int64_t>(curTimeMillis64())); 本篇先从网路部分开始介绍NetworkInterfaceImpl。 ReplicationExecutor::NetworkInterface 接口,定义了startCommand cancelCommand runCallbackWithGlobalExclusiveLock等方法,抽象Command的处理逻辑。同时还有start wait等一些线程管理接口,重点关注在command和callback相关的实现。 首先来说明下连接池的管理: NetworkInterfaceImpl::ConnectionPool::ConnectionInfo 集成DBClientConnection外,还有一个creationDate成员,记录连接创建的时间,后面会根据这个时间来消耗过期的Connection。 NetworkInterfaceImpl::ConnectionPool::ConnectionPtr 保存的并不是Connection指针,而是一个list::iterator。构造该对象时,只需要指定HostAddress即可,而不是需要真正的Connection对象等。ConnectionPtr会根据HostAddress从Pool中获取(acquireConnection)相应的迭代器。 NetworkInterfaceImpl::ConnectionPool 核心类,连接池管理,不同与一般的连接池只维护同一个地址的多个连接。此连接池维护的是不同的地址,对应的多个连接,所有的空闲连接都存放在_connections成员中,使用中的连接存放在_inUseConnections,两个成员都被_mutex保护。调用者只要使用两个函数acquireConnection和releaseConnection,需要连接时调用acquireConnection,使用结束后通过releaseConnection归还。 acquireConnection 作用是获取Connection,如果Pool中没有,则创建一个Connection。首先从刚才说的_connections中根据参数传入的HostAddress查找到相应的ConnectionList。 获取到ConnectionList后,先会去尝试清理掉过期的Connection(参考cleanUpOlderThan_inlock实现)。然后取出ConnectionList中的第一个Connection放入到_inUseConnections中。 如果没有找到相应HostAddress的连接,或者连接过期被释放,甚至连接已经不可用。则重新构建连接,并且发送鉴权信息,保证连接的合法性,且可用。 cleanUpOlderThan_inlock 这里的处理比较暴力,个人认为并不是非常合理。简单讲就是找到从创建开始30秒的链接去杀掉销毁。如果能修改成只销毁长时间没有被使用过的链接,效果会更好。明明是在长期使用的连接,也因为30秒的问题而被销毁。 Sock::isStillConnected 判断连接是否可用的,采用了socket pool的方式来判断,用非阻塞的方式查看Socket的POLLIN事件,如果Socket中存在数据,或者任何其他的错误事件被触发,则说明连接状态不可用,返回False。 Sock::connect 因为connect timeout是受syncookies影响,timeout时间会非常的久,所以要在创建另一个线程中进行connect操作,原线程wait指定的时间,超时则放弃。这里的实现比较重,需要开辟新的线程,比较好的做法是使用epoll非阻塞的方法,可以参考解决方案Stackoverflow。 releaseConnection 从_inUseConnections再放回到_connections中,在List中的位置是头部。 NetworkInterfaceImpl 现在可以回头来继续说明NetworkInterfaceImpl。 NetworkInterfaceImpl虽然通过ConnectionPool管理了Socket,自身则来管理了一个线程池,线程数量最多51,最少也维护了1个线程。command任务都提交到_pending成员中,工作线程从中获取到Command任务对象来执行,生产者消费者模式。工作线程的最大空闲时间也是30S,如果30S时间内,线程空闲数量小于_peding数量,则自动销毁资源。 每个提交上来的任务,都有一个cbHandle,作为Callback的句柄参数。onFinish任务处理完成,或者被Cancel的话,都会调用。对于Cancel需要说明,只能取消处于pending状态的任务。 _runCommand 有了前面的ConnectionPool,这里就简单了很多,只需要从中拿到连接对象,发送Command消息即可。如果异常情况出现,比如连接失败,则通过捕获异常来反馈。虽然ConnectionPool没有对连接数量做出上限控制,但NetworkInterfaceImpl控制了线程数量,所以连接数基本也是可控的。理论来说最多的连接数量等于地址数量*51,但实际情况远达不到。
MongoDB的索引代码实现--BtreeBasedAccessMethod 前言 学习开源软件的最好的办法就是阅读代码,MongoDB整个代码库有接近90万行代码,DB核心层大概50万行,代码量确实非常多。本文作为MongoDB代码导读的第一篇,从Index部分上入手分析代码实现。为何从索引部分开始介绍,首先代码量较少,总共5000多行,且相对其他模块来说比较独立;其次索引对数据库的优化至关重要,了解其实现,对日后的运维优化和索引自身的限制约定都具有实际意义。毕竟文档上的描述还是没有代码来的准确。本文没有逐行的去分析源码,一来工作量太大,二来也没有办法完全揣测出作者的意图,莫不如只描述大概,剩下的留给阅读者自己思考。 代码导读 代码集中在src/db/index目录下,里面并没有涉及具体的btree数据结构,而是描述了一些操作方法,索引解析等。MongoDB的代码实现上涉及到了比较多的设计模式,比如Builder,Combination,Strategy,Factory等。如图,这部分代码主要采用的是Combination和Adapter模式,整个代码都围绕着BtreeBasedAccessMethod。 index_descriptor.h[cpp] 构造函数和初始化成员 _magic_code,通过magic检查一定程度上避免内存错误,野指针等造成的数据错乱。 _collection,所属的Collection _infoObj,配置管理信息 _numFields,索引字段数量,大于一个就是联合索引 _keyPattern,索引匹配串 _parentNS,namespace _isIdIndex,ID索引,如果索引字段只是_id则认为是IDIndex _indexName,索引的名字,用户可以指定 _sparse,是否是稀疏索引,稀疏索引不包含NULL字段 _unique,是否是唯一索引 _indexNamespace,索引的namespace,parentNs.$name _version,数据结构版本号,0和1两个,参看btree_key_generator.h[cpp] _cachedEntry,看到再说 areIndexOptionsEquivalent 检查是否是稀疏索引,是否是_ID索引,并且是否是唯一索引,注释里也说明了,不关心ID索引的排序顺序,因为升序或者降序,对查询来说,都是等价的,最后比较Options,注意version和ns是不比较的,background是创建时的参数,创建后就不再有意义 index_cursor.h 定义了一些游标接口方法,并声明了Yield逻辑,注释写的很清晰: /** * Yielding semantics: * If the entry that a cursor points at is not deleted during a yield, the cursor will * point at that entry after a restore. * An entry inserted during a yield may or may not be returned by an in-progress scan. * An entry deleted during a yield may or may not be returned by an in-progress scan. * An entry modified during a yield may or may not be returned by an in-progress scan. * An entry that is not inserted or deleted during a yield will be returned, and only once. * If the index returns entries in a given order (Btree), this order will be mantained even * if the entry corresponding to a saved position is deleted during a yield. */ 如果游标指向的元素在Yield期间没有被删除,那么在恢复后仍然指向这个位置。在Yield时,插入,删除,或者修改的元素,是否会被游标发现是不确定的。 因为考虑的效率问题,游标可能会Cache一批数据,这部分数据不会与原始的数据同步。同样,Yield时没有加入或者删除的元素,都会被返回,且只返回一次。 元素的返回顺序应该按照指定的顺序返回(升序或者降序),甚至是在Yield期间,指向的元素被删除了也要维持原有的顺序返回。 index_access_method.h[cpp] 操作接口,CRUD方法,注意是非线程安全的。对调用者来说,需要考虑底层的所以结构,接口的行为是不透明的。注释比较简单,不翻译了: /** * An IndexAccessMethod is the interface through which all the mutation, lookup, and * traversal of index entries is done. The class is designed so that the underlying index * data structure is opaque to the caller. * * IndexAccessMethods for existing indices are obtained through the system catalog. * * We assume the caller has whatever locks required. This interface is not thread safe. * */ btree_based_access_method.h[cpp] 继承自IndexAccessMethod,声明getKeys,交给由子类实现,是个Adapter模式。 insert通过getKeys获得doc的所有key,然后进行迭代修改Index,修改成功后,会对numInserted++,计数,统计本次操作修改索引次数。如果遇到失败情况,则要清理所有之前已经写入的数据,保证操作的原子性,即使失败,也要保证恢复到操作之前的数据状态(索引结构可能会发生变化,但是数据集合是等价的)。 另外,background build index,可能会重复插入索引,因为doc数据可能会在磁盘上移动,也就是会被重复扫描到。 removeremove比insert就简单多了,因为remove是不可能失败的 find找到后返回RecordID update对比先后两个对象,计算出diff,包括需要删除的和需要增加的,然后批量提交修改。注意,这里可能会失败,但是失败后索引没有再回滚到之前的数据集合。并且是先删除后增加,极端情况下会导致索引错误。思考:如果是先增加后删除,是不是更合适一点? btree_based_bulk_access_method.h[cpp] 只支持insert的bulk操作,insert的数据先保存在一个外部存储里,最大内存使用10MB大小,commit时再全部读出处理。 btree_key_generator.h[cpp] 该对象封装了一套解析解析算法,目的是解析出obj中的索引key,MongoDB相比较于传统的DB系统,在索引创建上支持Array结构,Array数据内容会根据索引扩展出来。 例如: 索引Pattern:{a: 1, b: 1} 被索引OBJ:{a: [1, 2, 3], b: 2}, 解析出来的IndexKeys:{'': 1, '': 2}{'': 2, '': 2}{'': 3, '': 2} 特别说明的是,只支持并列的一个数组索引,如果是多个数据都想被索引,会失败。因为会造成索引数量的不可控(A*B)。 实际上,MongoDB这里提供了两个版本的算法,V0和V1,2.0以后的MongoDB默认采用了V1,V0与WT存储引擎上还有写兼容性问题,参见:SERVER-16893所以,MongoDB现在只在mmap上支持V0算法,相关的检验代码: catalog_index.cpp if (v == 0 && !getGlobalEnvironment()->getGlobalStorageEngine()->isMmapV1()) { return Status( ErrorCodes::CannotCreateIndex, str::stream() << "use of v0 indexes is only allowed with the " << "mmapv1 storage engine"); } V1和V0的不同点也集中在数组的处理上,V0版本,过于简单,很多数组结构索引解析出来的结果不完整,主要体现在了NULL的处理。例如: Pattern Obj V0 V1 {'a.b.c':1} {a:[1,2,{b:{c:[3,4]}}]} [{:3 }{:4}] [{:null}{:3}{:4}] {'a':1,'a.b':1} {a:[{b:1}]} [{:[{b:1} ],:1}] [{:{b:1},:1}] {a:1,a:1} {a:[1,2]} [{:1,:[ 1,2]}{:2,:[1,2]}] [{:1,:1}{:2,:2}] 更多的数据测试可以参考btree_key_generator_test.cpp中的测试case。 虽然官方在ReleaseNote里说明:V1版本的索引更快,更节省空间,但实际上是V0版本算法漏洞太多。参见ReleaseNote 2.0 其他 index目录下还包含了FTS,GEO相关的代码没有说明,FTS部分等阅读到了在说,毕竟使用的场景不多;GEO部分涉及到了一些地理运算,图形学运算,虽然不复杂,后面作为GEO的专题讨论。余下的代码,主要逻辑在BtreeKeyGenerator部分,尤其是V1算法,读者可以再认真学习下。
关于TCP TCP具有良好的拥塞控制,可靠传输等特性,比较适合数据库产品的通讯协议。一些对数据一致性,可靠性要求不高的产品也有采用UDP协议实现。如Redis,Memcached都支持UDP访问,但从实际的生产上来说,TCP来的更可靠,UDP的“不可靠”性质,反而会带来更多的运维负担,增加了排查问题的复杂性。 关于BSON BSON作为JSON的一种扩展,支持了Binary的数据类型,日期数据等。相比较于Protocol Buffers而言,数据是Humman Readable。MongoDB经常提及的Documents,实际上就是BSON格式数据。同样的,支持嵌套的机制,BSON可以很好的映射成Object,这相对于表结构,在灵活性上提高了一大截。数据不在是扁平的,可以是树形的组织结构,比如: { "_id" : 1, "name" : { "first" : "John", "last" : "Backus" }, "contribs" : [ "Fortran", "ALGOL", "Backus-Naur Form", "FP" ], "awards" : [ { "award" : "W.W. McDowell Award", "year" : 1967, "by" : "IEEE Computer Society" }, { "award" : "Draper Prize", "year" : 1993, "by" : "National Academy of Engineering" } ] } 当然BSON也有非常讨厌的一些地方,比如编码后的数据过大,引入了过的括号,符号等。 Wire Protocol TCP是一种Stream的通讯方式,每次请求之间没有间隔,数据源源不断的发来,那如何才能识别出一个完整的请求块?一般的解决方法是加上一个Header,Header的长度固定,用来描述余下的信息量,包括携带的信息长度。额外说明:MongoDB的网络协议都是little-endian。 参考util/net/message.h的代码: struct Layout { int32_t messageLength; // total message size, including this int32_t requestID; // identifier for this message int32_t responseTo; // requestID from the original request // (used in responses from db) int32_t opCode; }; messageLength表示整个协议的长度,因为是头部,所以Client每次发送命令时都要先将数据写到Buffer里,得到完整的长度后才能通过TCP发送整个请求。MongoDB规定,messageLength不能大于48MB(1000计算),过大的请求包一般意味着过于复杂的请求类型,或者过大的Document,这与NoSQL的设计原则也是违背的。 requestID/responseTo每个请求都有一个ID标识,同一时刻不应该出现相同的requestID,Driver和Server通过这个字段来确认是否是同一个请求的上下文 opCode操作代码,支持的类型:request-opcodes 相关的解析代码在MessagingPort::recv(Message& m)函数内,首先读取固定长度的Header(Layout),读取到Header后,根据messageLength数值做个预判是否是其他的协议类型,预判全部通过后等待读取余下的协议(messageLength-4),如果SocketBuffer中数据不足,就会阻塞在这里,等待数据包完整到达。 从网络上获得了完整的数据后交给MyMessageHandler::process来处理接下来的命令,这时opCode开始发挥作用,assembleResponse函数会根据opcode的不同,按照不同的协议去解析出相应的对象,然后执行命令。最后按照同样的协议格式发送给Client响应。发给Client的responseTo设置为与请求命令的requestID相同,以便Driver对应到相应的上下文。 参考引用 MongoDB Wire Protocol MongoDB Wire Protocol Part 1
上篇文章说到,MongoDB的网络通讯协议流程。拿到请求对象后,会调用assemblyResonse函数处理。这部分的代码实在没什么章法可言,if-else遍地,实在不怎么优雅。可以感受到随着需求的增长,很多代码都是硬套上去的。本篇介绍的是基本的处理请求,基本请求所指的是command命令以外的处理行为。 assembleResponse 首先,获取到线程绑定(ThreadLocal)Client对象,并对权限模块初始化,更新Auth Cache中过期或者失效的权限信息。然后,根据配置记录diaglog;接着统计操作计数器。接着,调用receivedXXXX函数,执行相关请求,其间会对权限进行判断。最后,根据slowlog配置,记录Profiling数据。 Query为例: try { ... Status status = client-&gt;getAuthorizationSession()-&gt;checkAuthForQuery(ns, q.query); audit::logQueryAuthzCheck(client, ns, q.query, status.code()); uassertStatusOK(status); ... catch (AssertionException&amp; e) { ... ok = false; } checkAuthForQuery之后会调用isAuthorizedForActionsOnNamespace做具体的验证功能: if (!isAuthorizedForActionsOnNamespace(ns, ActionType::find)) { return Status(ErrorCodes::Unauthorized, str::stream() &lt;&lt; &quot;not authorized for query on &quot; &lt;&lt; ns.ns()); } isAuthorizedForActionsOnNamespace函数用来验证资源和动作的权限合法性,不同的操作都对应自己的一套动作,相关的对应关系总结如下: OP_CODE VALUE receivedXXXX checkAuthForXXXX ActionType OP_UPDATE 2001 receivedUpdate checkAuthForUpdate update OP_INSERT 2002 receivedInsert checkAuthForInsert createIndex/insert OP_QUERY 2004 receivedQuery checkAuthForQuery find OP_GET_MORE 2005 receivedGetMore checkAuthForGetMore listCollections/listIndexes/find OP_DELETE 2006 receivedDelete checkAuthForDelete remove OP_KILL_CURSORS 2007 receivedKillCursors checkAuthForKillCursors killCursors PS: 2003操作代码已经废弃,目前是保留字段 command请求也是通过OP_QUERY发送过来的,对于command请求,代码上分之处理,个人不是很喜欢这个混用OP_CODE的风格,也许是伟大的历史原因造成的 inprog killop unlock 不走以上流程,通过Query包封装传递过来后,调用了相应的处理函数处理 command的权限验证额外说明 所有的ActionType定义在db/auth/action_types.txt中,非常多 isAuthorizedForActionsOnNamespace 主要对Action和Resource封装,然后调用_isAuthorizedForPrivilege完成功能。 简单介绍下Privilege,Action就是对数据的操作,比如Query,Insert都可以归纳为Action;Resource就是数据集合,可以是Collection,也可以是DB,那Privilege就是Action*Privilege的组合,一个Privilege可以含有多个Action,但在Privilege维度上,Action都只能与一个(或者表达式)Resource组合。Privilege的集合可以组合成Role概念,方便用户配置。 这个函数的处理算法是,遍历授权过的所有用户,和待验证的Resource比较,如果找到,则对比Action,所有的Action都找到的话,则通过验证。 for (UserSet::iterator it = _authenticatedUsers.begin(); it != _authenticatedUsers.end(); ++it) { User* user = *it; for (int i = 0; i getActionsForResource(resourceSearchList[i]); unmetRequirements.removeAllActionsFromSet(userActions); if (unmetRequirements.empty()) return true; } }
微博,论坛类的内容管理业务,是非常适合MongoDB的文档结构,因为其JSON结构支持嵌套,数组等数据结构。
这类案例非常多,建议先通过互联网搜索技术方案。
_id相当于一个唯一标识,背后也是一个uniq key索引。从通用性的角度上看,MongoDB用其独特的算法生成了一个唯一标识,可以认为是绝不冲突的。并且MongoDB内部同步时,也严重依赖这个_id索引。
当然,如果你的业务比较独特,希望自定义_id,MongoDB完全支持的。但建议也是自增形式,对性能有好处。
这么巧,阿里云有MongoDB异地多活的解决方案。
MongoDB自身的NoSQL特性,决定了其存储读取的性能都非常高。另外,非常非常重要的原因是,MongoDB具有灵活的扩展性,完善的Sharding机制。另外,5W并发的成功案例非常多。
建议检查下自己的语法,内嵌查询肯定是可以的
注意配置cacheSizeGB,建议低于系统内存的50%。更深入的调整还有些Eviction参数,但调整起来比较麻烦。建议先尝试设置cacheSizeGB
MongoDB 3.2中将InMemory是企业版功能,同时提供了另外一个存储引擎命名为ephemeralForTest。
但实际上MonogoDB 3.0.6的InMemory就是MongoDB 3.2的ephemeralForTest。
参考配置:
storage:
engine: "ephemeralForTest"