monstache同步mongo数据到es并保证高可用
需求 & 问题描述
- 我们需要将MongoDB的数据实时同步到Elasticsearch中(包括数据变更),在评估了AWS DMS和Monstache之后,暂定选择Monstache插件同步数据
什么是Monstache?
-
Monstache 是Golang语言实现的基于MongoDB的oplog实现实时数据同步及订阅的插件,支持MongoDB与ES之间的数据同步。其中MongoDB需要搭建副本集。
实践
-
monstache是通过配置文件启动的,配置参数比较丰富
monstache 启动需要指定配置文件并取名config.toml,如下
# connection settings # print detailed information including request traces #启用调试日志,这项要放在最上面,否则日志打印不到文件 verbose = true # connect to MongoDB using the following URL # 指定mongo 连接地址,一定要搭建mongodb集群 mongo-url = "mongodb://192.168.7.51:27021" #"mongodb://root:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717" # connect to the Elasticsearch REST API at the following node URLs # 指定es 连接地址 elasticsearch-urls = ["http://localhost:9200"] # frequently required settings # if you need to seed an index from a collection and not just listen and sync changes events # you can copy entire collections or views from MongoDB to Elasticsearch # 要监听的mongodb的集合格式是 库名.集合名 direct-read-namespaces = ["mssiot_forum_merossbeta.f_posts"] # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces # change streams require at least MongoDB API 3.6+ # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment # in this case you usually don't need regexes in your config to filter collections unless you target the deployment. # to listen to an entire db use only the database name. For a deployment use an empty string. #change-stream-namespaces = ["mydb.col"] # additional settings # if you don't want to listen for changes to all collections in MongoDB but only a few # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection # this setting does not initiate a copy, it is only a filter on the change event listener #namespace-regex = '^mssiot_forum_merossbeta\.f_posts$' # compress requests to Elasticsearch #gzip = true # generate indexing statistics #stats = true # index statistics into Elasticsearch #index-stats = true # use the following PEM file for connections to MongoDB #mongo-pem-file = "/path/to/mongoCert.pem" # disable PEM validation #mongo-validate-pem-file = false # use the following user name for Elasticsearch basic auth elasticsearch-user = "elastic" # use the following password for Elasticsearch basic auth #elasticsearch-password = "<your_es_password>" # use 8 go routines concurrently pushing documents to Elasticsearch #monstache最多开几个线程同步到es,默认为4 elasticsearch-max-conns = 8 # use the following PEM file to connections to Elasticsearch #elasticsearch-pem-file = "/path/to/elasticCert.pem" # validate connections to Elasticsearch #elastic-validate-pem-file = true # propogate dropped collections in MongoDB as index deletes in Elasticsearch #mongodb删除集合或库时是否同步删除es中的索引 dropped-collections = false # propogate dropped databases in MongoDB as index deletes in Elasticsearch dropped-databases = false # do not start processing at the beginning of the MongoDB oplog # if you set the replay to true you may see version conflict messages # in the log if you had synced previously. This just means that you are replaying old docs which are already # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones. #replay = false # resume processing from a timestamp saved in a previous run # 从上一个时间点恢复 resume = true # do not validate that progress timestamps have been saved #resume-write-unsafe = false # override the name under which resume state is saved #resume-name = "default" # use a custom resume strategy (tokens) instead of the default strategy (timestamps) # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+ resume-strategy = 0 # exclude documents whose namespace matches the following pattern #namespace-exclude-regex = '^mydb\.ignorecollection$' # turn on indexing of GridFS file content #index-files = true # turn on search result highlighting of GridFS content #file-highlighting = true # index GridFS files inserted into the following collections #file-namespaces = ["users.fs.files"] # enable clustering mode # 指定monstache集群名,高可用模式中很重要 cluster-name = 'merossdev' # worker模式 #workers = ["Tom", "Dick", "Harry"] # do not exit after full-sync, rather continue tailing the oplog #exit-after-direct-reads = false namespace-regex = '^mssiot_forum_merossbeta\.(f_posts|\$cmd)$' [[mapping]] namespace = "mssiot_forum_merossbeta" index = "f_posts" #生产环境记录日志必不可少,monstache默认是输出到标准输出的,这里指定它输出到指定的日志文件(这个也是踩坑踩出来的哦!) #[logs] #info = "/var/monstache/log/info.log" #warn = "/var/monstache/log/wran.log" #error = "/var/monstache/log/error.log" #trace = "/var/monstache/log/trace.log"
启动方式 monstache -cluster-name merossdev -f config.toml,monstache是编译好的二进制文件,如下图所示
-
现在往mongodb写入一条数据,再去查询es,如下图所示
由此,经过实践,MongoDB对文档的其他操作同理 ,都会同步到es
-
Monstache的高可用之普通模式和多worker模式,****配置文件里面的cluster-name需要打开,cluster-name="你自定义monstache集群名字"
1. 基于普通方式
原理: When cluster-name is given monstache will enter a high availablity mode. Processes with cluster name set to the same value will coordinate. Only one of the processes in a cluster will sync changes. The other processes will be in a paused state. If the process which is syncing changes goes down for some reason one of the processes in paused state will take control and start syncing. See the section high availability for more information. 意思是在一个集群里面,只有一个进程会同步数据,其他进程处于Pausing状态,如果同步数据进程挂掉,其他的某一个Pausing状态进程会升级为监听状态 相关文档:https://rwynn.github.io/monstache-site/config/
****执行命令:monstache -cluster-name merossdev -f config.toml在终端连续两次执行该命令,便启动了两个monstache进程,其中一个进程在监听同步状态,另一个处于Pausing,如下图所示
上图为监听状态
上图为pausing状态
现在我们把正在监听的进程杀掉,验证一下处于pausing状态的进程是否会切换为监听状态,如下图所示
上图验证了处于pausing状态的进程已经切换为监听状态
\2. 基于多worker的方式
原理:workers- You can run multiple monstache processes and distribute the work between them. First configure the names of all the workers in a shared config.toml file. You can run monstache in high availability mode by starting multiple processes with the same value for cluster-name. Each process will join a cluster which works together to ensure that a monstache process is always syncing to Elasticsearch. 意思是多个worker协同工作, 在相同集群名下的所有worker都会同步数据,都不会处于pausing状态。集群名与worker名相同的进程,如果其中某一个进程处于监听状态,另一个会处于pausing状 态,当处于监听状态的进程挂掉之后,同名的进程由pausing状态升级为监听状态。你不能指定works列表之外的worker来启动进程。 相关文档 https://rwynn.github.io/monstache-site/advanced/#high-availability
前提条件:需要在配置文件指定workers : workers = ["Tom", "Dick", "Harry"]
执行命令:monstache -cluster-name HA -worker Tom -f config.toml
monstache -cluster-name HA -worker Dick -f config.toml
monstache -cluster-name HA -worker Harry -f config.toml
验证:我们往MongoDB同时写入10000条数据,monstache会hash所有worker,并把文档id交给某一个worker去执行,如下图
现在mongodb里面有10000条数据,es等待同步
启动三个worker,我们发现每个worker都同步相对量的数据
通过查询es,10000条数据已经同步完毕
Monstache的高可用之普通模式和多worker模式的比较
1 普通模式
优势:部署相对简单
劣势:处理数据较慢,原因是普通模式就只有一个worker在工作,然后指定你想要的goroutine去消费数据(该配置可以弥补多worker的方式)
2 多worker模式
优势:同步效率近乎实时,因为多worker同时工作,并且每一个worker还可以指定多个goroutine去消费数据,并发能力更高
劣势:部署相对繁琐
总结:由于普通模式和多worker模式在同步时间上其实相差不大,普通模式同步一万数据只需要1.5秒的时间(已在本地验证)且部署相对简单,所以最终选择普通模式
关于monstache的eks部署请参见:https://www.cnblogs.com/agopher/p/15704633.html
本文所引用的文档:
官方文档:https://rwynn.github.io/monstache-site/advanced/#high-availability
动手实践文档:https://help.aliyun.com/document_detail/171650.html#title-8gf-qh2-3qj