开发者学堂课程【MongoDB 快速入门:ChangeStreams 使用及原理】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/49/detail/1004
ChangeStreams 使用及原理(二)
三、使用介绍
1、change stream 的具体使用
接下来介绍关于 change stream 的具体使用,change stream 有哪些参数以及change stream 数据突出的格式。首先给出来了MongoShell的示例>db.watch( ,{fullDocument;True}),如何根据MongoShell去使用权限stream。第一部分参数是db参数,db部分可以有三个参数,第一个是单个db的维度,即 db.watch。还有全部db:db.getMongo.watch 即监控所有db。单个表的 db就是db.collection.watch即单个表的力度。第二个是 Aggregate 框架,参数默认可以留空,用户如果后续有过滤计算的需求可以添加到stage 里面,比如用户可以 $match,match 是匹配到用户感兴趣的event,比如用户需要 insert 和 update 操作可以在里面进行一些匹配,另外拉取到一个字段,用户可能不需要这么多字段,只需要几个字段,通过 projection 进行映射,拿到自己感兴趣的字段。第三个是 change stream 的具体的参数,比如 fullDocument 是吐出整个 Post Image 。默认是没有的。resumeAfter 是根据输入的token 断点续传。startAfter是根据输入的token 启动新的监听,新的监听流。二者的区别是前面这种情况对于表在中间过程断开了,表被jroup 然后断开了,如果再进行恢复的话,其实是恢复不了的,因为本身的表是job 了。startAfter 是启动了一个新的监听流,是没有问题的。startAtOperationTime 是根据输入的时间戳启动监听。maxAwriteTimeMS是超时时间即设置的超时时间内没有数据返回就是连接中断了。另外 batchSize 就是一次返回 batch 的大小,一次性返回聚合。
2、具体返回的 event 格式
详细的返回 event 的格式,比如 ID 字段是存储元信息,目前元信息只包括data字段,data 就是存储的数量 token,string 每次都会把 event 包括 resume token,用户拿到 token 后,可以进行存储,下次连接断开可以根据 token 进行断点续传。operation Type 即操作类型,包括insert,delete,replace,update,drop,rename,dropDatabase,invalidate。ns 是操作的命名空间,就是 namespace 可以是 db 下面的表。to 是只用于rename collection的时候才会出现。Rename collection 一个新的命名空间。documentkey包括_id即文档出现id是什么。updateDescription 是只有 operation Type=update 的时候才会出现,相当于增量的修改,比如修改了某个字段,删除了某个字段。所以说updateDescription 是吐出了某个增量的修改。下面有例子会进行介绍。cluster Time是一个时间戳,相当于 ts 字段,它是一个混合逻辑的时钟。txnNumber 是只在事物里出现,是事物里单调递增的序列号。Logic session 是请求所在的 session 的ID。
3、Insert
下面举几个详细的例子,比如 insert 操作,用户insert了一条数据,x=1,就会吐出右边的event的格式,event 格式里面本来是一个文档类型,首先是 id,id 下面是data,就是序列化以后的一个字符串。operation是一个insert类型,表示是一个插入。clusterTime 是一个时间戳,时间戳是一个64位,高位是一个32位的秒接时间戳。低位是一个计数。fullDocument 就是整个操作的 post majority,更新后的一个文档。Ns(namespace)是文档的主建 id。
4、update
update 的操作也基本类似,就是 operationType 变成了 replace 操作,fullDocument操作突出了整个 Document 更新以后的数据。Update如果是像$set或者是$on set场景,他没有吐出附到字段,也就是没有 preimage,包括 updateDescription 字段。举个例子更新了一个d字段,d 这个Fields 更新以后值是4,因为删了一个字段,比如说c字段,体现在 removedField s里面,这时如果用户想要拿到整个 preimage 就需要设置 fullDocument=true 参数就可以在更新场景下拿到整个更新的文档。
5、drop
举 Drop 的例子,比如监听了某个表,这个表后面被 Drop,它会先吐述一个 Drop operation Type,之后还会再吐出一个 invalidat e事件,表示这个表已经被删掉了,再进行就没有意义了,然后这个时候连接也会被断开。
四、原理介绍
1、基本原理
下面介绍一个关于 ChangeStream 的一个基本原理,基本的原理包括了副本集和分片集群,首先是副本集的一个场景,用户启动了一个 ChangeStream,watch一个表的db,甚至是说所有的 db,这个请求发到 MongoDb 里面,这个请求最终会建立Cursor,用户通过 Cursor 不断来进行一个 getMore 请求,拿到用户所希望得到的数据,这个机制和用户 find get more 数据原理上基本是相同的。在内部ChangeStream,在副本集里面到底进行了哪些操作,这里给出了一个详细的例子以及过程,第一个阶段 MongoDb 收到请求以后会先过滤 oplog,也就是先去 oplog表,然后过滤 oplog,根据设置的参数,用户只要某个表,别的数据都不要了就进行过滤,另外还会过滤一些本身没用的一些 oplog 数据,比如像 oplog event。第二阶段过滤完以后会把 oplog 数据转化成 change stream event,因为两个格式是不一样的,因此需要转换。接着需要去判断是否需要返回 invalidate,比如说进行了一个表,表被删掉了这个时候就要返回 invalidate,如果不是这个操作就没必要返回,下面这个阶段 change stream 内部需不需要判断,是不是可以恢复,比如用户指定了一个时间戳或者 是token,需要去进行判断是否进行恢复用户直接请求,如果是invalidate,则需要处理将 cursor 关闭,执行关闭的逻辑。最后用户参数还设置fullDocument=true,会进 行一次额外的 query,因为本身 oplog 是不包括 pose image的,MongoDb 为了实现 pose image 的语义,需要进行一次额外的 query 请求,这一介绍是关于 MongoDb 副本集状况下运转的一个过程,他本身就是 ppl 的模式,是根据一个 agreet 框架来实现的,就是一个个的 stage,上游 stage 的输出作为下游 stage 的输入,这时 ppl 链式的一种模式。
在分片集群下原理基本跟副本集是一致的,只是说分片集群的 Shard 跟副本集基本上是一致的,分片集群的 MongoS 需要去承担一个转发以及消息聚合,比副本集更多的一个功能。
这里给出了一个详细的例子,比如用户发出了一个请求,告诉 mongosqinggeiwo 10:以后的db1的所有变更数据,mongos 收到请求之后会将请求发送给所有的shard上,建立三个curser,所有的请求与用户请求是一样的,告诉shard请给我10:00以后的db1的所有变更数据。假设以shade2为例,首先会去查看oplog表,拿到10点以后的数据,10点以前都是不要的,发现10点以后db1上的数据,操作op=u是一个update操作,发现操作符合就会返回给mongos。继续操作10:10,是一个db2的,不是db1,这条数据就是没有必要的,10:20这条语句是db1的,本身是delete操作,这个语句也是符合的,也会返回给mongos。此外其他的shell也是同样的道理,比如shard返回op=i的操作,shard返回op=d的操作,这些语句都会在mongos上进行聚合,排序返回给用户,change stream一个个吐出来,顺序是按时间戳进行排序的,这里介绍了mongos如何处理分发。
关于聚合mongos本身不是这样粗暴,change stream是一个实时流失的过程,消息是不断进行的。不会是一次等待数据,一次进行排序返回,用户的实时性将会收到极大的损失。所以MongoDB采用了更加细粒度的方式,解决消息如何排序,如何吐出消息。
这里举出了一个例子,正方形方框里面的数字表示oplog或者是event的时间戳,当下这个情况是mongos已经返回了所有时间戳<=2的数据,之前介绍mongos到shell上是建立了一部分的cursor,每个cursor都会有一个队列docbuffer queue,存放从shell上拉取的数据,比如shard拉取到的是4、6、12、13,shard2拉到的是5、9、11、14,shard3拉到的是3、7、8、10。Mongos会根据本身聚合的逻辑,总结来说它就是多路归并和多链接的一种算法,比如每次比较都是比较队列的docbuffer,队列头部的元素将最小的拿出,shard1的4,shard2的5,和shard3的3,先将三个进行比较,发现3是最小的,将3拿出,接着是比较4、5、7,4是第二小的,将4拿出。依次拿出了3、4、5、6、7、8、9、10,10条event,这时会将消息排完序后返回给用户。
返回之后mongos会继续进行刚才的过程,继续进行数据的排序拉取,这时会去查看比如docbuffer queue1会有12和13,docbuffer queue2有11和14,docbuffer queue3目前是没有数据的,没有数据就没有办法进行排序,比如现将11返回,可能在docbuffer queue3上需要拉取的数据是10,比11小是有可能的。可能因为网络原因shard3的数据较晚,也可能因为shard3内部中间卡掉,或者是其他的原因,总之Shard3的数据来的比较晚,所以这时不能立刻返回数据,这时mongos会继续发送三条getmore请求,到三个shard上,继续拉取数据,之后放到docbuffer queue里面缓存。比如shard1返回两条数据20和22,shard2返回两条数据21和27,shard3没有数据返回,可能用户监听表shard没有数据,shard3没有数据不会什么都不返回,会返回承诺,承诺是告诉mongos虽然现在没有数据,但是如果有数据返回的时间戳至少是大于17。mongos得到承诺之后,就可以对17之前的数据进行排序,然后返回没有必要干等。这时mongos就会把11、12、13、14按照时间戳进行排序,排序结束后返回给用户。并且更新minPromisedSortkey=17,下面的过程就会继续进行重复,不断地mongos请求get shard 拉取数据,在docbuffer queue里进行缓存、排序,这样的过程。
最后对本节课内容进行总结,首先介绍了什么是change stream,change stream本身就是基 MongoDB oplog 实现的实时增量吐出数据,为什么要使用change stream,change stream 可以满足用户的多项需求,比如监控、数据同步、分析、推送等等之类的需求,用户可以根据 change stream 达到非常灵活的需求,最后介绍了change stream内部如何去实现,分别介绍了 MongoDB 副本集是如何实现的,分片集群是如何实现的,另外还介绍了如何对接 change stream 。