elasticsearch+logstash_jdbc 实现mysql数据实时同步至es

jdk安装1.8版本,es、ls、ik、kibana版本一致我这里使用的6.6.2版本

安装es
tar xf elasticsearch-6.6.2.tar.gz
mv elasticsearch-6.6.2 /home/heron/elasticsearch
## 使用普通用户启动
chown -R heron.heron /home/heron/elasticsearch
su - heron
cd /home/heron/elasticsearch

cat config/elasticsearch.yml

##修改前三行,添加最后两行到行尾
node.name: test-01
network.host: 10.10.1.231
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"

cat config/jvm.options

###修改为512M
-Xms512m
-Xmx512m

安装ik+pinyin分词器

tar xf elasticsearch-analysis-pinyin-6.6.2.zip 

mv elasticsearch-analysis-pinyin-6.6.2 /home/heron/elasticsearch/plugins/pinyin

tar xf elasticsearch-analysis-ik-6.6.2.zip 

mv elasticsearch-analysis-ik-6.6.2 /home/heron/elasticsearch/plugins/ik

启动

/home/heron/elasticsearch/bin/elasticsearch -d

创建thinkcmf5索引和shop_goods表

curl -XPUT "http://10.10.1.231:9200/thinkcmf5"   -H 'Content-Type: application/json' -d'
{
"settings":{
"number_of_shards":"3",
"index.refresh_interval":"15s",
"index":{
"analysis":{
"analyzer":{
"ik_pinyin_analyzer":{
"type":"custom",
"tokenizer":"ik_smart",
"filter":"pinyin_filter"
}
},
"filter":{
"pinyin_filter":{
"type":"pinyin",
"keep_first_letter": false
}
}
}
}
}
}' curl -XPUT "http://10.10.1.231:9200/thinkcmf5/_mapping/shop_goods" -H 'Content-Type: application/json' -d'
{
"properties": {
"goodsname":{
"type": "text",
"analyzer": "ik_smart",
"search_analyzer": "ik_smart",
"fields": {
"pinyin":{
"type":"text",
"analyzer": "ik_pinyin_analyzer",
"search_analyzer": "ik_pinyin_analyzer"
}
}
}
}
}'
安装logstash-input-jdbc插件

首先安装logstash

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.2.tar.gz
tar xf logstash-6.6.2.tar.gz
mv logstash-6.6.2 /home/heron/logstash

安装jdbc插件

cd  /home/heron/logstash
./bin/logstash-plugin install logstash-input-jdbc

准备

配置文件:mysql.conf

sql文件:my_sql2.sql

mysql 的java 驱动包 :mysql-connector-java-5.1.44-bin.jar

cat /home/heron/logstash/mysql.conf

input {
jdbc {
jdbc_driver_library => "/home/heron/logstash/config/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.10.1.231:3306/thinkcmf5"
jdbc_user => "root"
jdbc_password => "dg9WA1nv"
statement_filepath => "./data/my_sql2.sql"
schedule => "* * * * *"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000" }
}
filter {
mutate {
rename => { "[host][name]" => "host" }
}
}
output{
stdout {
codec => rubydebug
}
elasticsearch {
index => "thinkcmf5"
document_type => "shop_goods"
hosts => "10.10.1.231:9200"
}
}

cat /home/heron/logstash/data/my_sql2.sql

###全量
select * from shop_goods
###增量,由于开始使用时间戳存储时间,ls变量使用实际时间,因此我转换了一下
select * from shop_goods where createtime > unix_timestamp(:sql_last_value)

启动

/home/heron/logstash/bin/logstash -f /home/heron/logstash/mysql.conf
安装kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.2.tar.gz
tar xf kibana-6.6.2.tar.gz
mv kibana-6.6.2 /home/heron/kibana

cat config/kibana.yml

###修改文件
server.port: 5601
server.host: "10.10.1.231"
elasticsearch.hosts: ["http://10.10.1.231:9200"]

启动

/home/heron/kibana/bin/kibana -d

上一篇:在 .NET Core 中结合 HttpClientFactory 使用 Polly(中篇)


下一篇:PHP中的面向对象思想