mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是有些时候,我们在做查询时,由于查询条件的多样、变化多端(比如根据时间查、根据名称模糊查、根据id查等等),或者查询的数据来自很多不同的库表或者系统,这时就很难以一个较快的速度(几百毫秒)直接获取我们想要的数据,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 mysql 中的数据同步到 elasticsearch 中即可,而 logstash 刚好就是一个同步神器,能够很好的满足我们的需求。
一:版本说明
logstash:7.6.1
elasticsearch:7.5.2
jdk:1.8
下载对应平台的压缩包,解压即可使用。
二:原理
使用logstash自带的logstash-inpu-jdbc插件,实现从mysql向es同步数据
官方插件使用说明:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
三:配置
使用 logstash-input-jdbc 插件读取 mysql 的数据,原理就是定时执行一个 sql,然后将 sql 执行的结果写入到流中;增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段.
input {
# 使用logstash-input-jdbc插件
jdbc {
# mysql连接驱动包
jdbc_driver_library => "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar"
# mysql连接驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
# mysql jdbc连接串
jdbc_connection_string => "jdbc:mysql://xxxxxxxxxxx"
# 设置时区
jdbc_default_timezone => "Asia/Shanghai"
# 设置连接数据库的用户名和密码
jdbc_user => "xxx"
jdbc_password => "xxxxx"
# 开启分页
jdbc_paging_enabled => "true"
# 开启分页后,每页数据条数大小
jdbc_page_size => "1000"
# 使用本地时区
plugin_timezone => "local"
# 开启大小写敏感
lowercase_column_names => "false"
# 同时时间间隔,分别对应”分 时 月 周 年“ 后面可以设置时区,也可不设,如果注释schedule,则只运行一次同步,进程及停止
schedule => "*/5 * * * * Asia/Shanghai"
# 读取数据库使用的SQL文件,其中有一个需要特别说明的关键字:sql_last_value,此值是读取last_run_metadata_path文件内记录的值
statement_filepath => "/data/www/logstash-7.6.1/conf/xxxxxxx.sql"
# 记录每次同步tracking_column_type追踪的值
last_run_metadata_path => "/data/www/logstash-7.6.1/conf/xxxx_modify_time"
# 开启记录最后同步的值
record_last_run => "true"
# 设置false为记录时间戳,true为其他字段的值
use_column_value => "false"
# 追踪字段的类型,可选timestamp、numeric
tracking_column_type => timestamp
# 具体跟踪的同步表字段
tracking_column => "modify_time"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => "false"
}
}
#filter部分对数据进行处理,此处只对@timestamp字段处理,设置为本地时间
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
#向es同步数据的相关配置
elasticsearch {
hosts => ["127.0.0.1:9200","127.0.0.1:9300"]
#将数据中id字段设置为es中索引的_id,不使用es自动生成的随机_id,提供写入速度
document_id => "%{id}"
#定义es中索引名称,如es没有创建或不存在,会自动创建,自动创建的索引可能会不满足要求,建议使用自行创建的索引
index => "xxxx_index"
document_type => "_doc"
}
# 以下配置为debug,可在测试中开启,方便定位问题,生产或正式使用务必关闭,以免产生大量日志
stdout {
codec => rubydebug
}
}
- jdbc_driver_library
mysql-connector-java-5.1.44.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。
- jdbc_driver_class
驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
- sql_last_value
标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
- statement
执行同步的sql语句,可以同步部分数据;
- statement_filepath
存储执行同步的sql语句,不和statement同时使用;
- schedule
定时器,表示每隔多长时间同步一次数据,格式类似crontab;
- tracking_column
表示表中哪一列用于判断logstash同步的位置信息,与sql_last_value比较判断是否需要同步这条数据;
- tracking_column_type
racking_column指定列的类型,支持两种类型:numeric(默认)、timestamp;
- last_run_metadata_path
存储sql_last_value值的文件名称及位置。
- document_id
生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。
注意⚠️
1、同步单张表可以使用:logstash -f /path/to/xxxx.conf
2、同步多张表时,可通过pipeline管道的方式
3、当表数据超过百万级别时,建议分批同步,可按照每100w条数据同步一次来进行,以加快同步速度,具体可使用表id的区间来确定同步数量,则在SQL中增加id区间值。
多表同步的pipeline.yml配置如下:
- pipeline.id: xxxx1
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf
- pipeline.id: xxxx2
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf
- pipeline.id: xxxx3
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf
- pipeline.id: xxxx4
pipeline.workers: 1
path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf
四:启动
单表启动,指定conf配置文件的位置
bin/logstash -f config/logstash-mysql-es.conf > llogstash.log 2>&1 &
当使用多表同步时,启动时,不需要指定具体的conf配置文件,即:
bin/logstash > logstash.log 2>&1 &