目录
如何将MySQL数据库中的数据同步至Elasticsearch中?
kibana mapping设置
PUT twitter1
{
"mappings": {
"properties": {
"location": {
"type": "geo_point"
},
"user": {
"type": "keyword"
}
}
}
}
店铺的Mapping
{
"properties": {
"id": {
"type": "keyword"
},
"storeName": {
"type": "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"storeIntroduction": {
"type": "text"
},
"storeBrand": {
"type": "text"
},
"openDate": {
"type": "date"
},
"storePhoto": {
"type": "keyword"
},
"storeTags": {
"type": "text"
}
}
}
/**最开始的是id字段类型是keyword,说明通过关键字进行搜索。
storeName字段定义为text类型是支持全文索引的,同时通过fields定义keyword类型说明也可以做关键字查询。
StoreIntroduction字段定义为text类型。
storeBrand字段定义为text类型。
openDate字段定义为date类型。
storePhoto字段定义为keyword类型。
StoreTags字段定义为text类型。*/
如何将MySQL数据库中的数据同步至Elasticsearch中?
logstash在程序中的位置
如下所示:Logstash左边是数据源,例如报表、结构化数据、统计数据等等,通过Logstash以后 会将这些数据进行分析、归档、监控、告警。这里我们可以将其理解为ETL的工具,它可以面向多个数据源,将数据拉入Logstash进行处理之后,最后将其输出成你想要个格式。
如下所示:从左往右看,最左边就是我们所说的数据源,在本例中的数据源就是MySQL数据库,中间灰色的部分就是Logstatsh其包括三个部分:Input、Filter和Output。这三个部分都包含在Pipeline的管道中。这个三个部分会对输入的数据源进行处理然后生成最终的Elasticsearch索引文件。
-
Pipeline:包括input-filter-output的3阶段的处理流程,它是通过队列进行管理的。
-
Logstash Event:它是数据信息在Logstash中流转的形式,数据源的数据通过input进入到Logstash以后就会被转化为Logstash Event,这个Event会经过filter过滤最终event会通过output输出到目标数据源,也就是Elasticsearch中。
左边的Raw Data作为原始的数据源文件通过Input进入到Logstash中,此时会使用codec-decode对数据源进行转换,转化为Logstash Event以后经过Filter进行过滤的操作,然后还是以Logstash Event的形式通过Output中的codec-encode模块将其转换成最终的Data,并且输出。
Pipeline 的配置举例
mysql.conf
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.56.10:3306/elasticsearch"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT * FROM t_goods_store"
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "id"
last_run_metadata_path => "syncpoint_table"
}
}
filter {
ruby{
code => "event.set('openDate', event.get('open_date').time.localtime.strftime('%Y-%m-%d'))"
}
ruby{
code => "event.set('storeName',event.get('store_name'));
event.set('storeIntroduction',event.get('store_introduction'));
event.set('storeBrand',event.get('store_brand'));
event.set('storePhoto',event.get('store_photo'));
event.set('storeTags',event.get('store_tags'));"
}
mutate {
remove_field => ["tags","@version","@timestamp","open_date","store_name","store_introduction","store_brand","store_photo","store_tags"]
}
}
output {
elasticsearch{
hosts => ["192.168.56.10:9200"]
index => "store_index"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
在input的部分需要获取数据源的数据信息,在店铺搜索案例中,需要通过MySQL数据库获取店铺的信息。因此在Input节点中需要对MySQL进行操作,而连接数据库是该操作中最主要的动作,而这一操作主要都在jdbc节点下完成。
其主要是完成对jdbc相关配置,包括如下:
-
jdbc_driver_library:定义连接MySQL数据库驱动的jar包。
-
jdbc_driver_class:定义MySQL数据库的驱动类。
-
jdbc_connection_string:数据库连接串。
-
jdbc_user:访问数据库用户名。
-
jdbc_password :访问数据库密码。
-
Statement:定义需要获取数据的SQL语句。
-
use_column_value:使用递增列的值
-
tracking_column_type:递增字段的类型,numeric表示数值类型,
-
tracking_column:递增字段的名称
-
last_run_metadata_path :同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
Input将数据载入到Logstash之后就轮到Filter过滤器上场了,filter节点中通过ruby定义的过滤插件对数据处理进行定义。如图2 所示,
-
Ruby的第一段代码将open_date字段中保存的日期类型数据进行了格式化。
-
Ruby的第二段代码将MySQL中字段的命名转换为驼峰命名的方式。
-
Mutate节点将会保留过滤器中的字段命名方式,而移除MySQL中字段的命名方式。
完成input和filter的配置,最后来看Output的配置。由于输出的数据目标是Elasticsearch的索引文件,因此需要加入Elasticsearch的节点:
-
Hosts:对Elasticsearch的IP和端口进行定义。
-
Index:对要保存的Elasticsearch索引进行定义。
-
Document_id:由于索引中一个document id就是一条记录,因此这里使用了自增的id变量作为document id的值。
-
Stdout:标准输出的格式以JSON格式为主。
实践同步
下载logstach, 并在logstash-7.9.3/logstash-core/lib/jars目录下放入mysql-connector-java-8.0.22.jar即可使用jdbc链接数据库,而且在input中也不必指定jdbc_driver_library。
之后执行命令
bin/logstash -f config/mysql.conf
之后就是改造es中的分页查询变为从ES中查询。
https://codechina.csdn.net/JunMa_First/es.git 代码地址
学自: