使用 go-mysql-elasticsearch 把 MySQL 中的业务日志导入 Elastic

前言

相当一部分应用的日志是保存在数据库之中的,这些陈旧又稳定的应用在支撑着业务的运行。它们产生的日志通常来说也是有其价值的,但是如果能够融入到目前较为通用的 Elasticsearch 当中的话,可能有助于降低运维工作量,防止信息孤岛,并且进一步挖掘既有应用和业务的商业价值。

go-mysql-elasticsearch 就是这样一个项目,它可以从 MySQL 的数据表中读取指定数据表的数据,发送到 ElasticSearch 之中。它会使用 mysqldump 命令处理现有存量数据,并借助 binlog 的方式跟踪增量数据,从而保证 Elasticsearch 的数据和 MySQL 数据库中的数据保持同步。下面会简单讲一下这一项目的配置,并试验一个简单例子,最后根据实际情况进行一些改进。

条件和假设

  • 目前该工具支持 MySQL 和 ES 的版本都是 5.x。

  • MySQL 服务器需要开启 row 模式的 binlog。

  • 因为要使用 mysqldump 命令,因此该进程的所在的服务器需要部署这一工具。

  • 这一工具使用 GoLang 开发,需要 Go 1.9+ 的环境进行构建。

  • 可用的 MySQL、Elasticsearch 以及 Kibana 实例。

另外为了进行演示,这里做一点假设:

业务日志表

CREATE TABLE `biz_log` (  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,  `receive_content` text,  `send_content` text,  `fd_receive_content` text,  `fd_send_content` text,  `log_time` datetime DEFAULT NULL,  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4;

四个 text 字段使用 JSON 格式存储了几个不同的日志种类。

工具构建

  1. go get github.com/siddontang/go-mysql-elasticsearch

  2. cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

  3. make

转换配置

执行 go-mysql-elasticsearch --help,会看到一系列的参数,最主要的参数就是 -config,这个参数用于设置转换过程所需的参数配置文件,在源码的 /etc/river.toml中包含了一份配置样本。作者在这一配置样本中提供了非常详尽的注释,可以对转换过程做出很多定制。但是由于工具本身具备很好的适应能力,加上 ES 的强大功能,只需要一点简单的设置,就能够顺利完成常见任务了。

一个简单的配置:

# MySQL 的相关配置
# 指定用户必须具备复制权限
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = "Flzx3000c"
my_charset = "utf8mb4"

# ES 相关配置
es_addr = "127.0.0.1:9200"
es_user = ""
es_pass = ""

# 数据源配置
# 以 Slave 模式工作
server_id = 10001
# mysql/mariadb
flavor = "mysql"

# mysqldump 路径,如果为空或者未设置,会跳过这一环节。
mysqldump = "mysqldump"
bulk_size = 128
flush_bulk_time = "200ms"
skip_no_pk_table = false

[[source]]
# 数据库名称
schema = "biz"
# 数据表同步范围,支持通配符
tables = ["biz_log"]

# 规则定义
[[rule]]
# 数据库名称
schema = "biz"
# 规则对应的数据表,支持通配符
table = "biz_log"
# 目标 ES 索引
index = "biz"
# 该规则在 ES 中生成的文档类型
type = "log_db"

同步

配置文件完成之后,就可以执行 ./go-mysql-elasticsearch -config=./river.toml,日志中会显示首先执行 mysqldump 导出存量数据,然后开始守护进程阶段,跟踪 binlog 并进行同步。

此时打开 Kibana,执行 GET _search,会看到数据库记录已经进入了 ES 中,并且按照我们定义的规则进行了索引。在守护进行运行期间,如果有新的数据插入,也会同步到 ES 之中。

但是这里我们会发现一个小问题,前面提到的 JSON 字段被作为单一的字符串存入了 ES 索引。这样就根据 JSON 中的特定字段进行搜索的需要就比较费劲了,而我们也知道,如果直接向 ES 提交文档,其中的 JSON 是会被映射为 Object 类型的。如果对 ES 索引进行数据类型的定义,会发现直接将 JSON 字段映射到 Object 类型后,同步过程会失败,返回错误认为将无效内容映射到了这一类型。因此可以推测是字符串并没有使用原有格式提交给 ES。

经过对代码的阅读跟踪,发现在 elastic/client.go 中对数据进行了一次 Json 编码:

    default:        //for create and index
        data, err = json.Marshal(r.Data)        if err != nil {            return errors.Trace(err)
        }

下面就尝试进行一点改动,使之支持嵌套在字段内容中的 JSON 内容。

JSON

这里我想到了一个简单粗暴的办法就是,对数据报文进行一次检查,如果该字段内容是有效 JSON 的话,就使用 github.com/buger/jsonparser 的 set 方法,将压缩后的 JSON 字符串重新赋值给编码后的 byte[]

首先给 BulkRequest 定义一个新方法,用于数据编码

func (b *BulkRequest) encodeData() []byte {
     jsonResults,_ := json.Marshal(b.Data)     // 判断是否有效的 JSON 数据
     isJson := func(s string) bool {         var js map[string]interface{}         return json.Unmarshal([]byte(s), &js) == nil

     }     for key, value := range b.Data {
         stringValue, ok := value.(string)         // 如果字段内容是字符串并且是 JSON 格式
         if ok && isJson(stringValue) {             // 设置编码后内容该字段的值为原文
             jsonResults,_ = jsonparser.Set(jsonResults, []byte(stringValue), key)
            }
     }     return jsonResults
 }

然后将原有的 data, err = json.Marshal(r.Data) 替换为 data = r.encodeData(),再次构建运行。会看到 ES 成功的将 JSON 字段进行了解析,生成了 Object 类型的映射关系。

补充说明

这里引用go-mysql-elasticsearch功能及性能验证 一文的性能测试结果:

1.全量同步
支持:需要安装mysqldump(mysql自带),同步11.5w数据,耗时3分13秒。
全量基于mysqldump,需要将工具和mysql安装在同一个节点,其它方式尚未找到。
2.增量同步
支持。
增量插入20W数据,耗时8分钟。
删除20w条数据,耗时6分。
更新20w条数据,12分钟。

这一工具还有一些其它亮点,例如多表聚合、字段过滤、自定义字段映射等。


上一篇:使用Docker为Kibana设置密码


下一篇:IDEA--代码带入码云Gitee