前言
相当一部分应用的日志是保存在数据库之中的,这些陈旧又稳定的应用在支撑着业务的运行。它们产生的日志通常来说也是有其价值的,但是如果能够融入到目前较为通用的 Elasticsearch 当中的话,可能有助于降低运维工作量,防止信息孤岛,并且进一步挖掘既有应用和业务的商业价值。
go-mysql-elasticsearch
就是这样一个项目,它可以从 MySQL 的数据表中读取指定数据表的数据,发送到 ElasticSearch 之中。它会使用 mysqldump
命令处理现有存量数据,并借助 binlog 的方式跟踪增量数据,从而保证 Elasticsearch 的数据和 MySQL 数据库中的数据保持同步。下面会简单讲一下这一项目的配置,并试验一个简单例子,最后根据实际情况进行一些改进。
条件和假设
目前该工具支持 MySQL 和 ES 的版本都是 5.x。
MySQL 服务器需要开启 row 模式的 binlog。
因为要使用
mysqldump
命令,因此该进程的所在的服务器需要部署这一工具。这一工具使用 GoLang 开发,需要 Go 1.9+ 的环境进行构建。
可用的 MySQL、Elasticsearch 以及 Kibana 实例。
另外为了进行演示,这里做一点假设:
业务日志表
CREATE TABLE `biz_log` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `receive_content` text, `send_content` text, `fd_receive_content` text, `fd_send_content` text, `log_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4;
四个 text
字段使用 JSON 格式存储了几个不同的日志种类。
工具构建
go get github.com/siddontang/go-mysql-elasticsearch
cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch
make
转换配置
执行 go-mysql-elasticsearch --help
,会看到一系列的参数,最主要的参数就是 -config
,这个参数用于设置转换过程所需的参数配置文件,在源码的 /etc/river.toml
中包含了一份配置样本。作者在这一配置样本中提供了非常详尽的注释,可以对转换过程做出很多定制。但是由于工具本身具备很好的适应能力,加上 ES 的强大功能,只需要一点简单的设置,就能够顺利完成常见任务了。
一个简单的配置:
# MySQL 的相关配置 # 指定用户必须具备复制权限 my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "Flzx3000c" my_charset = "utf8mb4" # ES 相关配置 es_addr = "127.0.0.1:9200" es_user = "" es_pass = "" # 数据源配置 # 以 Slave 模式工作 server_id = 10001 # mysql/mariadb flavor = "mysql" # mysqldump 路径,如果为空或者未设置,会跳过这一环节。 mysqldump = "mysqldump" bulk_size = 128 flush_bulk_time = "200ms" skip_no_pk_table = false [[source]] # 数据库名称 schema = "biz" # 数据表同步范围,支持通配符 tables = ["biz_log"] # 规则定义 [[rule]] # 数据库名称 schema = "biz" # 规则对应的数据表,支持通配符 table = "biz_log" # 目标 ES 索引 index = "biz" # 该规则在 ES 中生成的文档类型 type = "log_db"
同步
配置文件完成之后,就可以执行 ./go-mysql-elasticsearch -config=./river.toml
,日志中会显示首先执行 mysqldump
导出存量数据,然后开始守护进程阶段,跟踪 binlog 并进行同步。
此时打开 Kibana,执行 GET _search
,会看到数据库记录已经进入了 ES 中,并且按照我们定义的规则进行了索引。在守护进行运行期间,如果有新的数据插入,也会同步到 ES 之中。
但是这里我们会发现一个小问题,前面提到的 JSON 字段被作为单一的字符串存入了 ES 索引。这样就根据 JSON 中的特定字段进行搜索的需要就比较费劲了,而我们也知道,如果直接向 ES 提交文档,其中的 JSON 是会被映射为 Object 类型的。如果对 ES 索引进行数据类型的定义,会发现直接将 JSON 字段映射到 Object 类型后,同步过程会失败,返回错误认为将无效内容映射到了这一类型。因此可以推测是字符串并没有使用原有格式提交给 ES。
经过对代码的阅读跟踪,发现在 elastic/client.go
中对数据进行了一次 Json 编码:
default: //for create and index data, err = json.Marshal(r.Data) if err != nil { return errors.Trace(err) }
下面就尝试进行一点改动,使之支持嵌套在字段内容中的 JSON 内容。
JSON
这里我想到了一个简单粗暴的办法就是,对数据报文进行一次检查,如果该字段内容是有效 JSON 的话,就使用 github.com/buger/jsonparser
的 set
方法,将压缩后的 JSON 字符串重新赋值给编码后的 byte[]
。
首先给 BulkRequest
定义一个新方法,用于数据编码
func (b *BulkRequest) encodeData() []byte { jsonResults,_ := json.Marshal(b.Data) // 判断是否有效的 JSON 数据 isJson := func(s string) bool { var js map[string]interface{} return json.Unmarshal([]byte(s), &js) == nil } for key, value := range b.Data { stringValue, ok := value.(string) // 如果字段内容是字符串并且是 JSON 格式 if ok && isJson(stringValue) { // 设置编码后内容该字段的值为原文 jsonResults,_ = jsonparser.Set(jsonResults, []byte(stringValue), key) } } return jsonResults }
然后将原有的 data, err = json.Marshal(r.Data)
替换为 data = r.encodeData()
,再次构建运行。会看到 ES 成功的将 JSON 字段进行了解析,生成了 Object 类型的映射关系。
补充说明
这里引用go-mysql-elasticsearch功能及性能验证 一文的性能测试结果:
1.全量同步 支持:需要安装mysqldump(mysql自带),同步11.5w数据,耗时3分13秒。 全量基于mysqldump,需要将工具和mysql安装在同一个节点,其它方式尚未找到。 2.增量同步 支持。 增量插入20W数据,耗时8分钟。 删除20w条数据,耗时6分。 更新20w条数据,12分钟。
这一工具还有一些其它亮点,例如多表聚合、字段过滤、自定义字段映射等。