MongoDB 4.2 内核解析 - Change Stream

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增强),用于订阅 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也可以将 MongoDB 的增量订阅应用到其他的关联系统;比如电商场景里,MongoDB 里存储新的订单信息,业务需要根据新增的订单信息去通知库存管理系统发货。

Change Stream 与 Tailing Oplog 对比

在 change stream 功能之前,如果要获取 MongoDB 增量的修改,可以通过不断 tailing oplog  的方式来 拉取增量的 oplog ,然后针对拉取到的 oplog 集合,来过滤满足条件的 oplog。这种方式也能满足绝大部分场景的需求,但存在如下的不足。

  1. 使用门槛较高,用户需要针对 oplog 集合,打开特殊选项的的 tailable cursor  ("tailable": true, "awaitData" : true)。
  2. 用户需要自己管理增量续传,当拉取应用 crash 时,用户需要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
  3. 结果过滤必须在拉取侧完成,但只需要订阅部分 oplog 时,比如针对某个 DB、某个 Collection、或某种类型的操作,必须要把左右的 oplog 拉取到再进行过滤。
  4. 对于 update 操作,oplog 只包含操作的部分内容,比如 {$set: {x: 1}} ,而应用经常需要获取到完整的文档内容。
  5. 不支持 Sharded Cluster 的订阅,用户必须针对每个 shard 进行 tailing oplog,并且这个过程中不能有 moveChunk 操作,否则结果可能乱序。

MongoDB Change Stream 解决了 Tailing oplog 存在的不足

  1. 简单易用,提供统一的 Change Stream API,一次 API 调用,即可从 MongoDB Server 侧获取增量修改。
  2. 统一的进度管理,通过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,即可从上次的位置接着订阅。
  3. 支持对结果在 Server 端进行 pipeline 过滤,减少网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
  4. 支持 fullDocument: "updateLookup" 选项,对于 update,返回当时对应文档的完整内容。
  5. 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,即可获取集群维度全局有序的修改。

Change Stream 实战

以 Mongo shell 为例,使用 Change Stream 非常简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操作。

db.getMongo().watch()    订阅整个实例的修改
db.watch()               订阅指定DB的修改
db.collection.watch()    订阅指定Collection的修改
  1. 新建连接1发起订阅操作
mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分钟
  1. 新建连接2写入新数据

         

mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })
  1. 连接1上收到 Change Stream 更新
mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

 

  1. 上述 ChangeStream 结果里,_id 字段的内容即为 resume token,标识着 oplog 的某个位置,如果想从某个位置继续订阅,在 watch 时,通过 resumeAfter 指定即可。比如每个应用订阅了上述3条修改,但只有第一条已经成功消费了,下次订阅时指定第一条的 resume token 即可再次订阅到接下来的2条。

    

mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

Change Stream 内部实现

watch() wrapper

db.watch() 实际上是一个 API wrapper,实际上 Change Stream 在 MongoDB 内部实际上是一个 aggregation 命令,只是加了一个特殊的 $changestream  阶段,在发起 change stream 订阅操作后,可通过 db.currentOp() 看到对应的 aggregation/getMore 操作的详细参数。

{
      "op" : "getmore",
      "ns" : "test.coll",
      "command" : {
        "getMore" : NumberLong("233479991942333714"),
        "collection" : "coll",
        "maxTimeMS" : 50000,
        "lsid" : {
          "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
        },
      },
      "planSummary" : "COLLSCAN",
      "cursor" : {
        "cursorId" : NumberLong("233479991942333714"),
        "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
        "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
        "nDocsReturned" : NumberLong(1),
        "nBatchesReturned" : NumberLong(1),
        "noCursorTimeout" : false,
        "tailable" : true,
        "awaitData" : true,
        "originatingCommand" : {
          "aggregate" : "coll",
          "pipeline" : [
            {
              "$changeStream" : {
                "fullDocument" : "default"
              }
            }
          ],
          "cursor" : {

          },
          "lsid" : {
            "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
          },
          "$clusterTime" : {
            "clusterTime" : Timestamp(1577774144, 1),
            "signature" : {
              "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
              "keyId" : NumberLong(0)
            }
          },
          "$db" : "test"
        },
        "operationUsingCursorId" : NumberLong(7019500)
      },
      "numYields" : 2,
      "locks" : {

      }
    }

resume token

resume token 用来描述一个订阅点,本质上是 oplog 信息的一个封装,包含 clusterTime、uuid、documentKey等信息,当订阅 API 带上 resume token 时,MongoDB Server 会将 token 转换为对应的信息,并定位到 oplog 起点继续订阅操作。

struct ResumeTokenData {
    Timestamp clusterTime;
    int version = 0;
    size_t applyOpsIndex = 0;
    Value documentKey;
    boost::optional<UUID> uuid;
};

ResumeTokenData 结构里包含 version 信息,在 4.0.7 以前的版本,version 均为0; 4.0.7 引入了一种新的 resume token 格式,version 为 1; 另外在 3.6 版本里,Resume Token 的编码与 4.0 也有所不同;所以在版本升级后,有可能出现不同版本 token 无法识别的问题,所以尽量要让 MongoDB Server 所有组件(Replica Set 各个成员,ConfigServer、Mongos)都保持相同的内核版本。

更详细的信息,参考 https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability

updateLookup

Change Stream 支持针对 update 操作,获取当前的文档完整内容,而不是仅更新操作本身,比如

mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

上面的 update 操作,默认情况下,change stream 会收到 {_id: 101}, {$set: {age: 20}  的内容,而并不会包含这个文档其他未更新字段的信息;而加上 fullDocument: "updateLookup" 选项后,Change Stream 会根据文档 _id 去查找文档当前的内容并返回。

需要注意的是,updateLookup 选项只能保证最终一致性,比如针对上述文档,如果连续更新100次,update 的 change stream 并不会按顺序收到中间每一次的更新,因为每次都是去查找文档当前的内容,而当前的内容可能已经被后续的修改覆盖。

Sharded cluster

Change Stream 支持针对 sharded cluster 进行订阅,会保证全局有序的返回结果;为了达到全局有序这个目标,mongos 需要从每个 shard 都返回订阅结果按时间戳进行排序合并返回。

在极端情况下,如果某些 shard 写入量很少或者没有写入,change stream 的返回延时会受到影响,因为需要等到所有 shard 都返回订阅结果;默认情况下,mongod server 每10s会产生一条 Noop 的特殊oplog,这个机制会间接驱动 sharded cluster 在写入量不高的情况下也能持续运转下去。

由于需要全局排序,在 sharded cluster 写入量很高时,Change Stream 的性能很可能跟不上;如果对性能要求非常高,可以考虑关闭 Balancer,在每个 shard 上各自建立 Change Stream。

参考资料

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
2月前
|
运维 监控 NoSQL
【MongoDB 复制集秘籍】Secondary 同步慢怎么办?深度解析与实战指南,让你的数据库飞速同步!
【8月更文挑战第24天】本文通过一个具体案例探讨了MongoDB复制集中Secondary成员同步缓慢的问题。现象表现为数据延迟增加,影响业务运行。经分析,可能的原因包括硬件资源不足、网络状况不佳、复制日志错误等。解决策略涵盖优化硬件(如增加内存、升级CPU)、调整网络配置以减少延迟以及优化MongoDB配置(例如调整`oplogSize`、启用压缩)。通过这些方法可有效提升同步效率,保证系统的稳定性和性能。
48 4
|
2月前
|
存储 人工智能 安全
操作系统的心脏:内核深度解析
【8月更文挑战第13天】 在数字世界的每一次跳动中,都能感受到操作系统内核的强大生命力。本文将带你走进操作系统的核心——内核,揭示它如何协调和管理计算机硬件资源,保证软件运行的高效和稳定。从内核的定义和功能,到它的结构和设计哲学,再到现代操作系统中的创新与挑战,我们将一起探索这个让计算机系统“活着”的秘密所在。
52 3
|
23天前
|
存储 算法 安全
操作系统的心脏:内核深入解析
本文将带您走进计算机的大脑—操作系统内核,探索它如何管理硬件资源、提供系统服务,并确保多任务高效运行。文章以浅显易懂的语言,逐步揭示内核的神秘面纱,从基础概念到实际应用,让您对操作系统的核心组件有更深的理解。
54 5
|
24天前
|
存储 资源调度 监控
操作系统的心脏:内核深度解析
在数字世界的庞大机器中,操作系统扮演着至关重要的角色。而作为操作系统核心的内核,其重要性不言而喻。本文将深入浅出地探讨操作系统内核的基本概念、主要功能和工作原理,以及它如何影响计算机的整体性能和稳定性。我们将从内核的设计哲学出发,逐步深入到内核的各个组成部分,包括进程管理、内存管理、文件系统和设备驱动等关键模块。最后,文章还将讨论当前操作系统内核面临的挑战和未来的发展趋势。通过这篇文章,读者将获得对操作系统内核更深层次的理解,从而更好地把握计算机系统的运行机制。
37 1
|
29天前
|
存储 资源调度 算法
操作系统的心脏:内核深入解析
本文将带你走进操作系统的核心—内核,通过浅显易懂的语言解释什么是内核、它如何工作以及为什么它对整个系统至关重要。我们将从内核的定义和功能出发,逐步深入到它的结构和设计哲学,最后探讨内核在现代计算环境中面临的挑战和未来发展方向。无论你是计算机新手还是有一定基础的学习者,这篇文章都会为你揭开操作系统内核的神秘面纱。
|
2月前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
27天前
|
人工智能 并行计算 安全
探索操作系统的心脏:内核深度解析
在数字世界的每一次跳动中,都能感受到一个强大而隐形的力量在默默支撑着一切——这就是操作系统的内核。本文将带你走进这个神秘而又强大的核心世界,从内核的设计哲学到它的架构布局,再到它如何与硬件、软件协同工作,以及面对现代挑战时的应对策略。我们将一起探索那些让操作系统能够高效、安全运行的秘密,解锁内核的奥秘,理解它对整个计算生态的重要性。准备好跟随我们的脚步,深入操作系统的核心,一窥究竟吧!
30 0
|
2月前
|
算法 安全 调度
操作系统的心脏:内核深度解析
【8月更文挑战第12天】 本文将带你进入操作系统的核心—内核,探索其设计、功能和工作原理。我们将通过通俗易懂的语言和生动的比喻,让你轻松理解这个复杂但至关重要的主题。无论你是计算机科学的初学者,还是有一定基础的开发者,这篇文章都将为你提供新的视角和深入的理解。让我们一起揭开操作系统内核的神秘面纱,探索它的奥秘吧!
32 3
|
2月前
|
C# 开发者 Windows
全面指南:WPF无障碍设计从入门到精通——让每一个用户都能无障碍地享受你的应用,从自动化属性到焦点导航的最佳实践
【8月更文挑战第31天】为了确保Windows Presentation Foundation (WPF) 应用程序对所有用户都具备无障碍性,开发者需关注无障碍设计原则。这不仅是法律要求,更是社会责任,旨在让技术更人性化,惠及包括视障、听障及行动受限等用户群体。
51 0
|
2月前
|
存储 资源调度 安全
操作系统的心脏:内核深度解析
本文将带你深入操作系统的核心—内核,探索其如何作为系统的中枢神经,协调硬件与软件之间的复杂交互。我们将从内核的基本概念出发,逐步揭示其设计哲学、关键组件以及在现代计算环境中的作用。通过这篇文章,你将获得对操作系统内核工作原理的深刻理解,并认识到它在维护系统稳定性和性能中的关键角色。

推荐镜像

更多
下一篇
无影云桌面