如何使mysql中的数据同步到es中?

方案一:(同步操作,代码侵入性比较高)

在往数据库中增加一条数据的同时,向es中也插入一条

Books.objects.create()
向es中插入一条数据

方案二:使用celery起一个定时任务

在用户低峰的时候,执行定时任务(比如每天晚上00:00),把当天新增的数据查询出来并处理成Json格式存到es中。

向es中插入一条

方案三:使用django信号

-Books.objects.create()
在插入后触发某个信号(post_save)  # 具体看django内置信号总结

方案四:单独做一个服务做同步

在github里面有个用go语言写的开源模块go-mysql-elasticsearch,下载地址:https://github.com/siddontang/go-mysql-elasticsearch

go mysql elasticsearch是一项将mysql数据自动同步到elasticsearch的服务。它首先使用mysqldump获取源数据,然后用binlog增量同步数据。

Golang 支持交叉编译,在一个平台上生成另一个平台的可执行程序

# 1.  Mac 下编译 Linux 和 Windows 64位可执行程序
CGO_ENABLED=0 
GOOS=linux 
GOARCH=amd64 
go build main.go

CGO_ENABLED=0 
GOOS=windows 
GOARCH=amd64 
go build main.go
//CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build


# 2. Linux 下编译 Mac 和 Windows 64位可执行程序
CGO_ENABLED=0 
GOOS=darwin 
GOARCH=amd64 
go build main.go

CGO_ENABLED=0 
GOOS=windows 
GOARCH=amd64 
go build main.go

# 3.  Windows 下编译 Mac 和 Linux 64位可执行程序
SET CGO_ENABLED=0
SET GOOS=darwin
SET GOARCH=amd64
go build main.go

SET CGO_ENABLED=0
SET GOOS=linux
SET GOARCH=amd64
go build main.go

在加入 PingCAP 之前,很长一段时间,我都跟 MySQL 打交道。MySQL 性能强悍,但是在一些全文检索,复杂查询上面并不快,效率堪忧。为了解决快速查的问题,我们之前尝试考虑过 Sphinx,但总觉得使用起来不方便。恰好那时候碰到了 Elasticsearch(ES),立刻就觉得这特么就是我们要的东西。

ES 底层基于 Lucene ,支持分布式,同时还提供了强大的 web 页面,点点鼠标就很容易进行数据查询。但我们的数据是存放在 MySQL 里面,如何将数据实时的导入给 Elasticsearch?

最开始我想到的是用 MySQL trigger 或者 UDF,不过立刻觉得这条路非常不靠谱,所以就把目标放在了 MySQL binlog 上面。

1.MySQL Binlog

要通过 MySQL binlog 将 MySQL 的数据同步给 ES, 我们只能使用 row 模式的 binlog。如果使用 statement 或者 mixed format,我们在 binlog 里面只能知道对应的 query 语句,完全没法知道这条语句到底改了啥数据,所以要从 binlog 里面得到实际的数据,只能用 row 模式。

Row 模式还可以设置 full,noblob 以及 minimal 三种 image 模式,后面两种主要是为了减少空间占用,默认是 full。个人其实最喜欢 full 模式,这样数据最全,而且也觉得空间占用对于现在的硬盘来说不是特别大的问题,毕竟我们还有定期清理 binlog 的机制。

同步 MySQL binlog 就很简单了,按照 MySQL replication 的协议,自己写一个客户端,模拟成 MySQL slave,注册给 MySQL master 就可以了。MySQL master 会实时的将数据的更新通过 binlog event 发送给 slave,然后我们自己解析 event 之后就能得到实际的数据了。

具体实现这里不做过多说明,大家可以参考 MySQL Client/Server Protocol 细致的了解 MySQL 的 protocol,binlog events 等相关知识。我也在 go-mysql项目里面实现了相关的 replication功能。

2.MySQL dump

如果是一个新建 MySQL,我们当然可以通过 binlog 的方式方便的同步数据。但如果我们想同步一个已经运行一段时间的 MySQL ,就可能会有问题了。因为这时候早期的 binlog 文件已经被删除,如果直接开始同步,我们就可能会缺失一部分早期更新的数据。

要解决这个办法也比较容易,参考 MySQL 通用的 backup方式,我们可以先使用 mysqldump 获取当前 MySQL 的整个 snapshot,直接解析生成的 dump 文件,就能得到当前所有的数据。然后在从这个 snapshot 对应的 binlog position 位置开始同步。

整个这套流程我也在 go-mysql 的 canal里面实现。

3.go-mysql-elasticsearch

我们通过 go-mysql 的 canal 组件,能非常方便的同步 MySQL 的数据,那么剩下的事情就简单了,将同步的 MySQL 数据直接发送给 ES 就可以了。
因为 ES 对外提供的 API 非常简单易用,我们非常容易就能写一个 client 与其交互,代码在 这里,不做详细说明了。
我们唯一需要注意的就是设置同步规则,也就是说,当我们同步了一行数据之后,这一行数据到底是如何组装发送给 ES 的。
同步规则在配置文件里面详细说明。首先,我们要定义需要同步的 table,这个我们在 source 里面指定,例如:

[[source]]
schema = "test"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]

在上面的例子中,我们需要同步 test 这个 database 里面的几张表。对于一些项目如果使用了分表机制,我们可以用通配符来匹配,譬如上面的 t_[0-9]{4},就可以匹配 table t_0000t_9999

对一个 table,我们需要指定将它的数据同步到 ES 的哪一个 index 的 type 里面。如果不指定,我们默认会用起 schema name 作为 ES 的 index 和 type。例如:

[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"

在上面的例子中,我们将 table t 的数据同步到 ES 的 index 为 test, type 为 t 的下面。

4. 自定义 Field mapping

默认情况下面,我们会使用 table 的 column name 作为 ES 里面的 field name,但有时候我们也可以换成另外的名字,例如:

[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

在上面的例子中,table tfield 的 column id ,我们映射成了 es_id,而 tags 则映射成了 es_tags

上面我们可以注意到 list 这个字段,他显示的告知需要将对应的 column 数据转成 ES 的 array type。这个现在通常用于 MySQL 的 varchar 等类型,我们可能会存放类似 “a,b,c” 这样的数据,然后希望同步给 ES 的时候变成 [a, b, c] 这样的列表形式。

5.Filter Field

[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

上面对于 table tfilter,我们只会同步 idname 这两列,其他的都不会同步了。

6.聚合多张表

上面我们说了支持多张表聚合,譬如:

[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"

在上面的例子中,我们会将所有满足格式 t_[0-9]{4} 的 table 同步到 ES 的 index 为 test,type 为 t 的下面。当然,这些表需要保证 schema 是一致的。

7.小结

可以看到,使用 go-mysql-elasticsearch,我们仅需要在配置文件里面写规则,就能非常方便的将数据从 MySQL 同步给 ES。上面仅仅举了一些简单的例子,go-mysql-elasticserch 现在还支持 parent-child relationship 的同步等。

当然,go-mysql-elasticsearch 还不完善,譬如还不能很好的处理 DDL 的情况,还需要支持更多的同步规则等。如果大家有兴趣,非常欢迎参与进来。

如何使mysql中的数据同步到es中?

上一篇:数据库


下一篇:MySQL-mycat读写分离