更多文章查看独家下载 | 《Elasticsearch 八大经典应用》独享大咖场景化应用的秘密>>>
在之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”,我详述了如何从 OpenSky Network API 接口把数据导入到 Elasticsearch,并对这些数据进行可视化分析。也许针对很对的情况这个已经很满足了,因为它确实可以帮我们从很多实时数据中提取很多有用的东西。
在今天的文章中,我们将参考之前的文章 “如何使用 Elasticsearch ingest 节点来丰富日志和指标” 。我们可以利用 Elasticsearch ingest 节点来更加丰富我们的数据,并对这些数据做更进一步的的分析。
为了达到这个目的,我们必须首先了解在之前索引中的 icao 字段。这个字段的意思是:
ICAO 机场代码或位置指示器是由四个字母组成的代码,用于指定世界各地的机场。 这些代码由国际民用航空组织定义并发布在国际民航组织7910号文件:位置指示器中,供空中交通管制和航空公司运营(例如飞行计划)使用。
我们之前的每个文档是这样的:
{
"velocity" : 0.0,
"icao" : "ad0851",
"true_track" : 264.38,
"time_position" : 1591190152,
"callsign" : "AAL2535",
"origin_country" : "United States",
"position_source" : "ADS-B",
"spi" : false,
"request_time" : 1591190160,
"last_contact" : 1591190152,
"@timestamp" : "2020-06-03T13:16:03.723Z",
"on_ground" : true,
"location" : "32.7334,-117.2035"
}
另外,我们可以在地址 https://opensky-network.org/datasets/metadata/ 找到一个如下文件:
在这里,我们可以找到一个叫做 aircraftDatabase.csv 的文件。它里面的内容如下:
在上面的表格中,我们发现有一个叫做 icao24 的字段。这个字段和我们之前的文档可以进行关联,从而我们可以得到更多关于某个航班的更多信息。
创建 enrich index
由于下载的文档时一个是一个 csv 的文件。我们可以使用 data visualizer 来导入。
点击上面的 Override settings 链接:
点击 Apply 按钮:
点击上面的 Import 按钮:
我们把这个索引的名字称作为 aircraft。点击 Advaned:
再次确认 mapping,如果没有问题的话,点击 Import 按钮:
由于这个文件比较大,所以需要一点时间来进行导入:
等完成后,我们可以在 Elasticsearch 中找到一个叫做 aircraft 的索引:
上面显示有一个新的 aircraft 的索引生成了。
创建 Enrich policy
接下来,我们来创建 enrich policy。它告诉我们如何丰富数据。在 Kibana 中打入如下的命令:
PUT /_enrich/policy/flights_policy
{
"match": {
"enrich_fields": [
"acars",
"adsb",
"built",
"category_description",
"engines",
"first_flight_date",
"icao_aircraft_type",
"line_number",
"manufacturer_icao",
"manufacturer_name",
"model",
"modes",
"notes",
"operator",
"operator_callsign",
"operator_iata",
"operator_icao",
"owner",
"reg_until",
"registered",
"registration",
"seat_configuration",
"serial_number",
"status",
"test_reg",
"type_code"
],
"indices": [
"aircraft"
],
"match_field": "icao"
}
}
我们使用 execute enrich policy API 为该策略创建enrich索引:
POST /_enrich/policy/flights_policy/_execute
接着,我们创建一个叫做 flights_aircraft_enrichment 的 pipeline:
PUT /_ingest/pipeline/flights_aircraft_enrichment
{
"description": "joins incoming ADSB state info with richer aircraft metadata",
"processors": [
{
"enrich": {
"field": "icao",
"policy_name": "flights_policy",
"target_field": "aircraft"
}
}
]
}
到此为止,我们已经成功地创建了 丰富策略。接下来,我们将展示如何使用这个 pipeline 来丰富我们的数据。
丰富数据
为了能够使用我们上面定义好的 pipeline,我们重参考之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”里的 fligths_logstash.conf 文件,并修改如下的 output 部分:
output {
stdout {
codec => rubydebug
}
elasticsearch {
manage_template => "false"
index => "flights"
# pipeline => "flights_aircraft_enrichment"
hosts => "localhost:9200"
}
}
我们把上面的这一行的注释拿掉:
# pipeline => "flights_aircraft_enrichment"
这样变成了:
output {
stdout {
codec => rubydebug
}
elasticsearch {
manage_template => "false"
index => "flights"
pipeline => "flights_aircraft_enrichment"
hosts => "localhost:9200"
}
}
在启动 Logstash 之前,我们可以先删除之前的 flights 索引:
DELETE flights
再接着执行如下的命令:
PUT flights
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"baro_altitude": {
"type": "float"
},
"callsign": {
"type": "keyword"
},
"geo_altitude": {
"type": "float"
},
"icao": {
"type": "keyword"
},
"last_contact": {
"type": "long"
},
"location": {
"type": "geo_point"
},
"on_ground": {
"type": "boolean"
},
"origin_country": {
"type": "keyword"
},
"position_source": {
"type": "keyword"
},
"request_time": {
"type": "long"
},
"spi": {
"type": "boolean"
},
"squawk": {
"type": "long"
},
"time_position": {
"type": "long"
},
"true_track": {
"type": "float"
},
"velocity": {
"type": "float"
},
"vertical_rate": {
"type": "float"
}
}
}
}
重新运行 Logstash:
sudo ./bin/logstash -f fligths_logstash.conf
我们在 Kibana 中检查 flights 的 mapping:
GET flights/_mapping
我们可以看到一些新增加的各个新字段:
我们可以通过 search:
"_source" : {
"aircraft" : {
"owner" : "Wells Fargo Trust Co Na Trustee",
"reg_until" : "2021-04-30",
"modes" : false,
"built" : "1984-01-01",
"acars" : false,
"manufacturer_icao" : "BOEING",
"serial_number" : "23018",
"manufacturer_name" : "Boeing",
"icao_aircraft_type" : "L2J",
"operator_callsign" : "GIANT",
"operator_icao" : "GTI",
"engines" : "GE CF6-80 SERIES",
"icao" : "a8a763",
"registration" : "N657GT",
"model" : "767-281",
"type_code" : "B762",
"adsb" : false
},
"true_track" : 272.81,
"velocity" : 5.14,
"spi" : false,
"origin_country" : "United States",
"@timestamp" : "2020-06-04T10:41:00.558Z",
"request_time" : 1591267250,
"time_position" : 1591267168,
"last_contact" : 1591267168,
"callsign" : "GTI165",
"icao" : "a8a763",
"location" : "39.0446,-84.6505",
"on_ground" : true,
"position_source" : "ADS-B"
}
}
我们可看到一个叫做 aircraft 的字段,它含有这个飞机所有被丰富的信息。
运用 Kibana 分析数据
找出前10的飞机型号
因为有新的字段进来,所以我们必须重新创建新的 inde pattern:
我们可以看到最多的是 PC-12/47E 这个机型。
找出飞机制造商的分布
我们看到 BOING 公司的市场份额是最大的。AIRBUS 处于第二的位置。
飞机机龄分布
我们可以看出来最多的飞机是2019年生产的。
飞机机型和飞行高度的关系
可以看出来 A320-214 飞机飞的是最高的。
Graph
运用 Graph 来找出数据直接的关系。如果你对 Graph 还不是很了解的话,请参阅我之前的教程 “Elastic Graph 介绍”。
点击 Create graph:
点击 Select a data source:
选择 flights* :
点击 Add fields:
添加 fields:
我们需要保持这个 graph。然后进行搜索:
从上面,我们可看出来 BOING 和我们想要的各个字段之间的关系。
我们从收集的数据可以有更多的其它的分析。在这里,我就不一一枚举了。你们可以做任何你想要的分析。