有没有又快又好的方法?接收同事是非开发人员,如果不写一行代码(脚本)就更好了!
2、方案探讨
2.1 前置认知
比较成熟同步方案选型。
Mysql 到 Elasticsearch 同步选定:logstash。
2.2 Json 字段的处理方案
2.2.1 方案一:遍历 Mysql,解析Json。
逐行遍历 Mysql,把 Json 字符串字段解析为单个字段,更新到Mysql中。
然后,logstash 同步到 Elasticsearch。
优点:很好理解,切实可行。
缺点:需要写解析代码,且涉及 Mysql 的逐行更新操作,慢且效率低。
2.2.2 方案二:logstash 中间环节用 json filter 插件过滤搞定 Json 串解析。
在 logstash 中间 filter 环节,加上 json 串的过滤。
举例如下(类似):
filter {
json {
source => "message",
target => "doc"
}
}
实战参考:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
优点:少了代码解析环节,借助插件实现。
缺点:需要修改 logstash 同步脚本,有一点学习成本。
2.2.3 方案三:Ingest 数据预处理搞定 json 解析。
既然 logstash json filter 插件能做数据解析,那么,与之对标的 Ingest 管道预处理中的 json processor 等 processor 组合肯定也能搞定。
优点1:少了代码解析环节,借助 Ingest processor 组合实现复杂数据预处理功能。
优点2:相比 logstash filter 更通俗易懂,小白也能快速上手。
缺点:占无。
3、实战一把
如前分析,方案一、二 也能搞定。
但是,方案三更方便,更适合技术小白人员甚至非技术人员。
我们就以方案三实战一把。
3.1 创建预处理管道
PUT _ingest/pipeline/text2json_pipeline
{
"description": "describe pipeline",
"processors": [
{
"json": {
"field": "wb_detail",
"target_field": "wb_json"
}
},
{
"script": {
"source": """
ctx.loc = ctx.wb_json.loc;
ctx.cont = ctx.wb_json.wc;
ctx.author = ctx.wb_json.usn;
ctx.area = ctx.wb_json.uloc;
ctx.url = ctx.wb_json.sr;
"""
}
},
{
"remove": {
"field": "wb_json"
}
}
]
}
如上所示,应用了三个 process。
processor 1:json 处理。
将 wb_detail 源字符串 变成 wb_json json串。
wb_json 属于中间过度字段。
processor 2:script 处理。
将 wb_json json 串中的字段逐个字段切分。
processor 3:remove 删除字段处理。
删除中间过度字段 wb_json。
3.2 创建索引,并指定 default_pipeline
PUT test-003
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "30s",
"index.default_pipeline":"text2json_pipeline"
},
"mappings": {
"properties": {
"area": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"author": {
"type": "keyword"
},
"cont": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"id": {
"type": "long"
},
"loc": {
"type": "keyword"
},
"publish_time": {
"type": "date"
},
"publish_timestamp": {
"type": "keyword"
},
"update_time": {
"type": "date"
},
"url": {
"type": "keyword"
},
"wb_detail": {
"type": "keyword"
},
"wb_id": {
"type": "keyword"
}
}
}
}
通过 default_pipeline 提前指定预处理管道的方式非常巧妙,避免了一次 reindex 操作。
相当于在写入环节同时做了数据的处理。
3.3 logstash 数据同步
之前同步讲的很多了,这里就不做具体字段含义的讲解,基本见名释义,很好理解。不明白的读者,留言讨论或者加 wx:elastic 6 讨论。
input {
stdin {
}
jdbc {
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://172.21.0.x:3306/weibo_base"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "XXXXX"
record_last_run => "true"
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/home/elasticsearch/logstash-7.6.0/sync/test_info"
clean_run => "false"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/home/elasticsearch/mysql-connector-java-5.1.47.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
# 以下对应着要执行的sql的绝对路径
statement_filepath => "/home/elasticsearch/logstash-7.6.0/sync/jdbc_test.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
filter {
}
output {
elasticsearch {
#ESIP地址与端口
hosts => "172.21.0.x:9200"
#ES索引名称(自己定义的)
index => "test-003"
user => "elastic"
password => "XXXXXX"
#自增ID编号
document_id => "%{id}"
}
stdout {
#以JSON格式输出
codec => json_lines
}
}
以上三步,搞定。
4、看效果
有图有真相。
数据源 json 字符串已经拆分为独立字段:area、loc、author 等。
拆分结果达到预期,就加了管道预处理一下,没有写一行脚本。