一、概述
(一)什么是 ChangStreams
ChangeStreams 功能是基于 MongoDB Oplog 实现的,Oplog 提供增量数据, ChangeStreams 在 Oplog 之上包裹一层应用,对外提供一个 API 接口,将数据进行实 时推送,推送的数据类型包括:Insert、Delete、Update、Invalidate、DDL。Invalidate: 主要适用于监控的表被删除时,监控此时没有意义,会返回 Invalidate 的事件;DDL 事件 是数据库操作语言,如 Create Database、Drop Database 等。总的来说, ChangeStreams 是基于 Oplog 实现的,提供推送实时增量推送功能。
(二)支持场景
版本的要求: ChangeStreams 支持副本集和分片集群的 MongoDB 形态,版本版本>=3.6。
支持粒度的三个维度:
- 全部 DB
- 单个 DB
- 单个表
引擎的要求:
WiredTiger 引擎。
ReadConcern 的要求:
ChangeStreams<=4.0 时,需要 majority 的 ReadConcern;
考虑到用户对于数据实时性的要求比较强,对数据一致性的要求比较弱,所以 ChangeStreams>=4.2 时放开了这个要求。我们建议用户还是配置一个 Majority 的 ReadConcern 级别,不过用户可根据自己场景的不同,适当放开这个一致性要求。
(三)版本历史
MongoDB 3.6 以前
MongoDB3.6 以前,用户只能自己从 Local.Oplog.Rs 表拉取增量变更的 Oplog 数
据。
困难点:1.⼿动设置过滤条件。2.分⽚集群处理⾮常复杂。3.⾃⼰管理断点续传。
MongoDB 3.6
MongoDB ChangeStreams 正式发布后,能够提供一个实时吐出增量变更信息,方 便用户的运维处理。
支持 PostImage。数据发生变更,发生 Update 操作,那么数据发生变更之前它是 PreImage,数据发生变更之后它是 PostImage。3.6 提供了 PostImage 的镜像,同时它 还支持断点续传管理.
MongoDB 4.0-4.2
ChangeStreams 不断进行优化,更好的支持分片集群的场景。
4.0.7 支持 PostBatchResumeToken,使得位点能够更好推进,防止用户在发生 MongoDB 重启或客户端重启导致数据位点回退,进而引发大批量无效的数据扫描。
4.0 的多文档事务,4.2 的分布式事务,ChangeStreams 都进行了很好地支持。
支持指定时间点启启动/恢复的功能。
用户可以指定任意时间点,ChangeStreams 都可以从此时间点进行启动 ChangeStreams 流以及恢复数据。
MongoDB 4.4+
MongoDB ChangeStreams 性能持续进行优化。
即将支持 PreImage,在性能上也得到了较大的提升。
随着 MongoDB ChangeStreams 版本不断进行迭代,功能和性能上做了很大的优
化。
(四)使用场景
案例 1.监控
用户需要及时获取变更信息(如账户相关的表), ChangeStreams 可以提供监控功 能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。
案例 2.分析平台
如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来, 推到下游的计算平台, 如说像 Flink、Spark 等计算平台等等。
案例 3.数据同步
基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的,那么这个集群可以做一个热备份,假如源端集群发生网络不通等 等之类的变故,备集群就可以接管服务。
还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一云端数 据库发生不可服务,就可以从文件里恢复出完整的 MongoDB 数据库,继续提供服务。 (当然,此处还需要借助定期全量备份来一同完成恢复)
另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国 等等。
案例 4.消息推送
假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信 息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。
总的来说,用户可以于 MongoDB ChangeStreams 功能,进行平台化构建,满足 用户的各项需求。当然,用户的需求可以是多样化,不仅仅局限这几个案例。
二、功能介绍
(一)特性
ChangeStreams 可归纳为 5 部分
持久性:
Majority-CommittedChanges
数据能够保证持久化,不会被回滚。
断点续传:
通过 Resume Token 进行断点续传的功能。
顺序性:
对于副本集保证线性一致性,对于分片集群保证因果一致性。
安全性:
ChangeStreams 可以进行安全控制。
灵活性:
因为 ChangeStreams 本身是基于 MongoDB Changes Aggregate 框架来实现 的,所以用户还可以在 Aggregate 上添加一些步骤,实现过滤、计算等需求。
(二)Majority-Committed Changes 的持久化
如用户写请求写到了 Primary 上,那么这个时候 Primary 上产生一条 Oplog,此时 ChangeStreams 并不会把 Oplog 吐出来,它还会把数据写到 Secondary 上,等 Secondary 写成功后才把这个数据吐出来,防止用户写 Primary 成功,之后 Primary 发 生宕机的情况,Secondary 成为新的 Primary,导致数据被回滚。
因此,ChangeStreams 吐出的数据都是持久化成功的数据。
(三)断点续传
举例:如用户从 MongoDB 去拉取一个实时的数据,此数据是根据时间戳递增的,如 8:00、9:00、10:00,那么如果 10:00 时间戳 MongoDB 发生宕机,或者用户本身的服务 发生意外,导致连接电路断开,这时候如果 MongoDB 或 Server 端恢复服务,希望数据 继续接着 10:00 开始拉取。
恢复后服务端发送一条消息“请给我 10:00 以后的数据。”那么 MongoDB 收到这个 消息后,继续把 10:00 以后的数据源源不断的通过 ChangeStreams 推送出来,10:00 后 是 11:00、 12:00,如此这般服务就能正常运行。
(四)顺序性—如何满足因果一致性
假如用户写请求,将 Insert,a=1,写在 Shard2 上,此时这条语句通过 ChangeStreams 吐出来了.
后来用户又 Update 了文档,把它从 a=1 改成 a=2,这条操作落在了 Shard2 上,此 时 ChangeStreams 会把第 2 条文档输出,也就是说这两条文档是具有前后因果性的,不 会先出第 2 条文档再吐出第 1 条文档,顺序保持严格的因果性。
如用户 Insert 了 1 条数据是 a=1,然后它落在 Shard2 上,然后又 Insert 了 1 条数据 a=2,但它落在 Shard3 上。
那么这 2 条数据它是同时落在 2 个不同的 Shard 上的,那么 ChangeStreams 它吐 出的顺序可能是先 a=2,再 a=1,也就是说这 2 条数据顺序在不满足的条件下,那么它输 出的顺序是不能够保证顺序性。
(五)ChangeStreams vs Oplog 拉取的对比
对接/使用成本:
Oplog 拉取是远远高于 ChangeStream 的,因为用户需要自去监听一个表,然后 Find+getMore 拉取,需要做过滤,对事务还需要进行一些额外的处理等等。
副本集支持:
Oplog 拉取和 ChangeStream 都是支持的。
DML 支持:
Oplog 拉取和 ChangeStream 都是支持的。
DDL 支持:
Oplog 拉取是全部支持的,ChangeStream 目前支持 dropCollection,dropDataba se,renameCollection 这 3 个语句,但后续官方会持续完善 DDL 语句,如 Create Index,Drop Index 等。
集群板支持:
Oplog 拉取必须关闭 Balancer,否则拉取出来 Oplog 不能保证因果一致性。 另外 DDL 需要去重,在 ChangeStream 里不需要额外去处理,成本对接很低。
事务处理:
Oplog 拉取的事务处理比较繁琐,如事务中 5 条语句同步 2 条语句发生断开, 重启后 事务处理会很繁琐。ChangeStream 不需要额外的对接,只当做一个普通语句。
断点续传:
Oplog 拉取是根据时间戳的,ChangeStream 可以根据时间戳,也可以根据 Token。
实时性/吞吐:
Oplog 拉取的实时性和吞吐比 ChangeStream 更高,ChangeStream 为了兼顾一 致性,加上目前实现方式是通过副本集上单线程拉取,因此 ChangeStream 性能上略低 于 Oplog 拉取。
但是未来MongoDB官方将会持续进行优化,使ChangeStream的性能追上Oplog, 缩小 Gap。
权限控制 :
Oplog 拉取只能是 2 种权限,All or Nothing,All 意思是全部的权限,Nothing 的意 思是没有权限,所以要么能监听到所有表所有数据,要么 1 条都拉不到。
ChangeStream 权限控制会更细粒度,用户可以根据目前账号的权限,有哪些表权限 就提供哪些表权限的拉取。
三、使用介绍
(一)MongoShell 示例
如何根据 Mongoshell 去使用 ChangeStream
1)DB 参数
DB 这个部分参数可以有 3 个参数:
- 单个 DB:db.Watch()
- 全部 DB:db.GetMongo() . Watch()
- 单个表:db.Collection.Watch()
2)Aggregate 的框架
这个参数默认可以留空,用户如果有过滤、计算等需求可以添加到 Stage 里。
如用户可在$Match 匹配到感兴趣的一些 Field,需要 Insert 和 Uptate 操作,那么可 以就在里面进行一个匹配。另外拉取到一个字段,用户可能不需要这么多字段,只需要某几 个字段,那么就可以通过$Project 去进行映射,拿到自己感兴趣的字段。
3)ChangeStream 的一些详细参数
如 FullDocument 是吐出整个,默认是没有的。
ResumeAfter 是根据 Token 进行断点续传,StartAfter 是根据 Token 启动一个新 的监听流,这 2 个区别是,如表在中间过程被 Drop 后断开,那么 ResumeAfter 就无法 恢复这个表了,因为今天的表本身已经 Drop。
StartAfter 是启动一个新的监听流。
StartAtOperationTime 是根据输入的时间戳启动一个监听。
MaxAwaitTimeMS 是指超时的时间,用户设置超时时间,如果该时间内没有数据返 回,那么连接将会被中断。
BatchSize 就是 1 次返回的 1 个 Batch 大小,就是 1 次返回聚合 ,多少条 Event 返回。
4)具体返回 event 格式
ID 字段是存储元信息,目前元信息只包括 Data 字段,Data 字段意思是存储的是 ResumeToken,ChangeStream 每次都会把 Event 包括 ResumeToken,用户拿到 Token 后可进行存储,下次连接断开可根据 Token 进行断点续传。
OperationType 操作类型包括 Insert,Delete,Replace,Update, Rename, DropDatabase,Invalidate。
Ns 是操作命名空间,就是 Namespace。 它某个是 DB 下面的某个表。
To 是指用于 RenameCollection 后的一个新的命名空间。
DocumentKey 是包括了一个_ID,指目前操作的文档的主键_ID 是什么。
UpdateDescription是OperationType=Update的时候出现,相当于是增量的修改。
ClusterTime 是操作一个时间戳,相当于 Oplog 里的 Ts 字段,是一个混合逻辑时间 时钟。
TxnNumber 是只在事务里面出现,是一个事务内部递增的序列号,
lsid 表示 Logic Session ID,是请求所在的 Session 的 ID。
5)Insert 操作
用户 Insert 条数据,x 等于 1,就会得到如图 Event 的格式,格式为文档类型。
首先是_ID,里面包括 Data,是对 Token 进行序列化后的字符串。
然后 OperationType 是 Insert 类型,表示此操作为插入,ClusterTime 是一个 64 位 时间戳,高位是一个 32 位的秒级时间戳,低位是一个计数。
FullDocument 就是整个操作的 PostImage 更新后的一个文档,然后 Ns Docume ntKey 就是整个文档的主键 ID。
6) Update 操作
Update 操作基本类似,OperationType 变成了 Replace 操作。 FullDocument 吐 出了整个 Document 操作更新后数据。
对于$Set 或$Unset 的更新来说,它吐出的内容没有 FullDocument 的字段,意味着 这里面没有 PostImage。它包括是 UpdateDescription 的字段,如图更新了一个 d 字段, 更新以后的值是 4,另外如删了一个 c 字段,那么它是体现在 RemovedFields 里。
如果用户想在此时拿到整个 PostImage 就需要去设置 FullDocument=True 参数, 就可在此更新场景下拿到整个更新后的文档。
7)Drop 操作
如监听的某个表被“Drop”了,那么它会先吐出 “Drop”的 OperationType,之后 会再吐出一个 Invalidate 事件,表示此表已被删掉除,继续监听就失去意义,所以此时连 接会被断开。
四、原理介绍
基本原理
案例 1 副本集场景
用户启动一个 ChangeStreams,它就 Watch 了一个表或一个 DB,甚至所有的 DB,这 个请求会发到了一个 MongoDB 上面。此请求会建立一个 Cursor,然后用户通过 Cursor 不断进行 GetMore,拿到用户所希望得到的数据。
这个机制和用户去放的加 Find+GetMore、拉取一些表和数据,原理基本一样。
内部实现上,ChangeStreams 在副本集里面做了哪些操作?
案例:
第一阶段
MongoDB 收到 ChangeStreams 请求后会先过滤 Oplog,也就是说他先去拉 Oplog 表,然后过滤 Oplog,根据用户设定的参数,如用户只要某个表,那么就会过滤掉 其他库表的数据,同时它还会过滤掉本身没用的 Oplog 数据,如 Noop Event。
第二阶段
会在过滤完后,它会把 Oplog 的数据转化成 ChangeStreams Event,因为 Oplog 和 ChangeStreams Event 格式是不一样的,需要进行转换。
第三阶段
它会去判断这个是否需要返回 Invalidate,如说表被删掉了,此时就需要返回 Invalidate。
第四阶段
ChangeStreams 需要判断是否可以恢复数据流。
如用户指定了一个时间戳,指定一个 Token,需要去判断是否可以进行恢复。
第五阶段
如果是 Invalidate,则需要处理具体关闭 Cursor 逻辑。
最后
如果参数设置了 FullDocument=True,则会进⾏⼀次额外的 Query。
分片集群和副本集相比, Shard 上的功能和副本集功能基本一致,此外,Mongos 还需要去承担“消息转发”和“消息聚合”功能,
分片集群和副本集相比, Mongos 需要去承担“转发”和“消息聚合”功能。
案例:
用户向 Mongos 发出请求,要 10:00 以后所有 DB1 的变更数据,Mongos 收到请求 后,才会把请求发送给所有的 Shard 上,建立 3 个 Cursor,告诉每个 Shard “请给我 1 0:00 以后所有 DB1 的变更数据”。
以 Shard2 为例,Shard2 上会先去查看 Oplog 表,拿到 10 点以后的数据,过滤掉 10 点以前的数据。Oplog 表中,第 3 条发生在 10:00,db1 数据 op=u,是一个 Update 操 作,此操作符合,因此它会返回给 MongoS;第 4 条 10:10 符合条件,但它是 db2 不是 db1,所以这条数据会被过滤掉;第 5 条 10:20 是 db1,它本身是一个 Delete 操作,所以 这个语句也符合,它也会返回给 MongoS;
此外,别的 Shard 也是同理,如 Shard1 有一条 op=i 的操作,Shard3 有一条 op=d 的操作,这些语句都会在 Mongos 上进行聚合,排序,返回给用户,通过 ChangeStrea ms 按时间顺序吐出。
这个案例介绍了 Mongos 如何处理消息分发。关于聚合 Mongos 不会这么粗暴,因为 本身 ChangeStreams 是一个实时数据流的过程,它的消息是不断推送的,不会一次性等 待 1 个小时的数据,然后进行排序再返回,这样用户的实时性会受到极大的损失。
所以 MongoDB 采用更细粒度的控制方法去解决消息如何排序,如何吐出。
案例:
下图中,正方形方框内的数字,表示 oplog 或 event 的时间戳,
此时 Mongos 已返回“所有时间<=2”的数据,那么 Mongos 到 Shard 上建立不同 的 Cursor,每个 Cursor 都有 1 个队列 DocBuffer Queue。存放着从 Shard 上拉取的 数据,如 Shard1 拉到的数据“4、6、12、13”,Shard2 拉到的数据是“5、9、11、14”, Shard3 上拉到的数据“3、7、8、10”,然后 MongoS 会根据返回的时间戳进行聚合排 序。
总结来说就是多路归并+小顶堆的排序算法。Mongos 会比较每个 DocBuffer 队列头 部的元素哪个最小,然后将数据拿出。如图会比较 Shard1 的 4,Shard2 的 5,Shard3 的 3,发现 3 是最小的,然后将 3 拿出来。接着会比较 4,5,7,将 4 拿出来,最后依次拿到 了“3、4、5、6、7、8、9、10”,10 条 Oplog(或 Event),将这些消息聚合,排完 序以后返回给用户。
返回后,MongoS 会继续进行数据的拉取排序。
继续查看 DocBuffer Queue1 有“12、13”、 Queue2 有“11、14”、 Queue3 目前没有数据(没有数据就无法进行排序)。此时不能对 Queue1 和 Queue2 的数据排序 后返回。因为假如先返回了 Queue2 的 11,但很有可能能因为网络原因,或其他原因导致 Shard3 数据没有立刻返回,比如后面 Queue3 返回了 10,比 11 还小,如果之前 11 已经 返回,则破坏了顺序性,所以此时数据不能返回出去。
这个时候 Mongos 的处理,会继续发送 3 条 GetMore 请求,到 3 个 Shard 上,然 后自己拉取数据,然后放到 DocBuffer Queue 里进行缓存。
如下图:
Shard1 里返回两条数据“20、22”, Shard2 里返回两条数据“21、27”, Shard3 依旧没有数据返回,Shard3 没有数据不会什么都不返回,它会返回 1 个承诺,这个 承诺的作用是告诉 Mongos 虽然现在没有数据,但下次将返回>17 的数据时间戳。
Mongos 拿到这个承诺以后就知道可以对<17 的所有数据进行排序操作,这样用户就不需 要等待。
然后 Mongos 会将“11、12、13、14”按时间进行排序然后返回给用户并更新“ MinPromisedSortKey=17”。然后下面继续重复刚才的过程,它是实时流的过程,不断 的请求 Shard,然后拉取数据,然后再到 DocBuffer Queue 里进行缓存,然后进行排序 这样一个过程。
快速掌握MongoDB核心技术干货目录
电子书下载:《玩转MongoDB从入门到实战》 | https://developer.aliyun.com/article/780915 |
走进 MongoDB | https://developer.aliyun.com/article/781079 |
MongoDB聚合框架 | https://developer.aliyun.com/article/781095 |
复制集使用及原理介绍 | https://developer.aliyun.com/article/781137 |
分片集群使用及原理介绍 | https://developer.aliyun.com/article/781104 |
ChangeStreams 使用及原理 | https://developer.aliyun.com/article/781107 |
事务功能使用及原理介绍 | https://developer.aliyun.com/article/781111 |
MongoDB最佳实践一 | https://developer.aliyun.com/article/781139 |
MongoDB最佳实践二 | https://developer.aliyun.com/article/781141 |