宗旨:
我们今天,使用logstash同步一下数据库,mysql-->ES
废话不多说,直接上
一,下载logstash
下载地址:Logstash 7.17.0 | Elastic
PS:
es,kibana,logstash 三个版本一定要一致(包括ik分词器)
前面我用的都是 7.17.0的版本。
下载完,解压
二,安装ruby环境
下载地址: Downloads
无脑下一步
三,修改pipelines.yml
位置:
把注释去掉(保存为 UTF-8格式!!!这里是个坑)
四,创建配置文件
先创建一个文件夹mysqlconfig
大家自己改一下前面的路径。
D:\elasticsearch\logstash-7.17.0\mysqlconfig
然后创建文件 mysql.conf
内容如下:
注意几点:
1 jdbc的userSSL=false 关闭 SSL
2 数据表中,最好有个自动生成的更新时间,ES可以根据这个字段取增量更新(后面我会放上sql建表语句)
3 jdbc_driver_library 是你的mysql驱动,我的是通过maven下载的,你们自己改改
4 last_run_metadata_path 这个路径改一下,文件名不用改
5 statement => "select * from myTest where sysUpdateTime > date_add(:sql_last_value,INTERVAL 8 HOUR)"
这一句大家自己改一下
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/ty_test?userSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "123456" # the path to our downloaded jdbc driver jdbc_driver_library => "C:\Users\pc\.m2\repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #要执行的sql文件 #statement_filepath => "/conf/course.sql" statement => "select * from myTest where sysUpdateTime > date_add(:sql_last_value,INTERVAL 8 HOUR)" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "D:/elasticsearch/logstash-7.17.0/config/logstash_metadata" } } output { elasticsearch { hosts => "localhost:9200" #hosts => ["localhost:9200","localhost:9202","localhost:9203"] #ES索引库名称 index => "mytest" document_id => "%{id}" document_type => "doc" } stdout { #日志输出 codec => json_lines } }
五,创建数据库表
CREATE TABLE `myTest` ( `id` int(11) NOT NULL, `name` varchar(50) DEFAULT NULL, `hobby` varchar(50) DEFAULT NULL, `amount` decimal(18,2) DEFAULT NULL, `sysUpdateTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- 录入数据 insert into myTest(id,`name`,hobby,amount) select 1,'1','1',80 union select 2,'2','2',40 union select 3,'3','3',40 union select 4,'4','4',100 union select 5,'5','5',100 union select 6,'6','6',50
六,启动测试
1 启动 ES bin下的bat
2 启动 kibana bin下的bat
3 启动 logstash
上面两个不讲了,我们说一下logstash同步命令
打开cmd,进入logstash的bin目录。
执行 logstash -f ../mysqlconfig/mysql.conf
第一次会全量更新数据。
接着每1分钟更新一次。
当你在mysql中录入数据,就会看到如下:(数据没有变更则只是单纯的一个select语句)
最后
可以去kibana查看一下