Observability:使用 Elastic Stack 分析地理空间数据

更多文章查看独家下载 | 《Elasticsearch 八大经典应用》独享大咖场景化应用的秘密>>>

在之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”,我详述了如何从 OpenSky Network API 接口把数据导入到 Elasticsearch,并对这些数据进行可视化分析。也许针对很对的情况这个已经很满足了,因为它确实可以帮我们从很多实时数据中提取很多有用的东西。

在今天的文章中,我们将参考之前的文章 “如何使用 Elasticsearch ingest 节点来丰富日志和指标” 。我们可以利用 Elasticsearch ingest 节点来更加丰富我们的数据,并对这些数据做更进一步的的分析。

Observability:使用 Elastic Stack 分析地理空间数据

为了达到这个目的,我们必须首先了解在之前索引中的 icao 字段。这个字段的意思是:

ICAO 机场代码或位置指示器是由四个字母组成的代码,用于指定世界各地的机场。 这些代码由国际民用航空组织定义并发布在国际民航组织7910号文件:位置指示器中,供空中交通管制和航空公司运营(例如飞行计划)使用。

我们之前的每个文档是这样的:


{
  "velocity" : 0.0,
  "icao" : "ad0851",
  "true_track" : 264.38,
  "time_position" : 1591190152,
  "callsign" : "AAL2535",
  "origin_country" : "United States",
  "position_source" : "ADS-B",
  "spi" : false,
  "request_time" : 1591190160,
  "last_contact" : 1591190152,
  "@timestamp" : "2020-06-03T13:16:03.723Z",
  "on_ground" : true,
  "location" : "32.7334,-117.2035"
}

另外,我们可以在地址 https://opensky-network.org/datasets/metadata/ 找到一个如下文件:

Observability:使用 Elastic Stack 分析地理空间数据

在这里,我们可以找到一个叫做 aircraftDatabase.csv 的文件。它里面的内容如下:

Observability:使用 Elastic Stack 分析地理空间数据

在上面的表格中,我们发现有一个叫做 icao24 的字段。这个字段和我们之前的文档可以进行关联,从而我们可以得到更多关于某个航班的更多信息。

创建 enrich index

由于下载的文档时一个是一个 csv 的文件。我们可以使用 data visualizer 来导入。

Observability:使用 Elastic Stack 分析地理空间数据

Observability:使用 Elastic Stack 分析地理空间数据

点击上面的 Override settings 链接:
Observability:使用 Elastic Stack 分析地理空间数据

点击 Apply 按钮:
Observability:使用 Elastic Stack 分析地理空间数据

点击上面的 Import 按钮:
Observability:使用 Elastic Stack 分析地理空间数据

我们把这个索引的名字称作为 aircraft。点击 Advaned:
Observability:使用 Elastic Stack 分析地理空间数据

再次确认 mapping,如果没有问题的话,点击 Import 按钮:
Observability:使用 Elastic Stack 分析地理空间数据

由于这个文件比较大,所以需要一点时间来进行导入:
Observability:使用 Elastic Stack 分析地理空间数据

等完成后,我们可以在 Elasticsearch 中找到一个叫做 aircraft 的索引:
Observability:使用 Elastic Stack 分析地理空间数据

上面显示有一个新的 aircraft 的索引生成了。

创建 Enrich policy

接下来,我们来创建 enrich policy。它告诉我们如何丰富数据。在 Kibana 中打入如下的命令:


PUT /_enrich/policy/flights_policy
{
  "match": {
    "enrich_fields": [
      "acars",
      "adsb",
      "built",
      "category_description",
      "engines",
      "first_flight_date",
      "icao_aircraft_type",
      "line_number",
      "manufacturer_icao",
      "manufacturer_name",
      "model",
      "modes",
      "notes",
      "operator",
      "operator_callsign",
      "operator_iata",
      "operator_icao",
      "owner",
      "reg_until",
      "registered",
      "registration",
      "seat_configuration",
      "serial_number",
      "status",
      "test_reg",
      "type_code"
    ],
    "indices": [
      "aircraft"
    ],
    "match_field": "icao"
  }
}

我们使用 execute enrich policy API 为该策略创建enrich索引:

POST /_enrich/policy/flights_policy/_execute

接着,我们创建一个叫做 flights_aircraft_enrichment 的 pipeline:


PUT /_ingest/pipeline/flights_aircraft_enrichment
{
  "description": "joins incoming ADSB state info with richer aircraft metadata",
  "processors": [
    {
      "enrich": {
        "field": "icao",
        "policy_name": "flights_policy",
        "target_field": "aircraft"
      }
    }
  ]
}

到此为止,我们已经成功地创建了 丰富策略。接下来,我们将展示如何使用这个 pipeline 来丰富我们的数据。

丰富数据

为了能够使用我们上面定义好的 pipeline,我们重参考之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”里的 fligths_logstash.conf 文件,并修改如下的 output 部分:


output {
    stdout { 
        codec => rubydebug
    }
 
    elasticsearch {
        manage_template => "false"
        index => "flights"
        # pipeline => "flights_aircraft_enrichment"
         hosts => "localhost:9200"
    }
}

我们把上面的这一行的注释拿掉:

# pipeline => "flights_aircraft_enrichment"

这样变成了:

output {
    stdout { 
        codec => rubydebug
    }
 
    elasticsearch {
        manage_template => "false"
        index => "flights"
        pipeline => "flights_aircraft_enrichment"
         hosts => "localhost:9200"
    }
}

在启动 Logstash 之前,我们可以先删除之前的 flights 索引:

DELETE flights

再接着执行如下的命令:

PUT flights
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "baro_altitude": {
        "type": "float"
      },
      "callsign": {
        "type": "keyword"
      },
      "geo_altitude": {
        "type": "float"
      },
      "icao": {
        "type": "keyword"
      },
      "last_contact": {
        "type": "long"
      },
      "location": {
        "type": "geo_point"
      },
      "on_ground": {
        "type": "boolean"
      },
      "origin_country": {
        "type": "keyword"
      },
      "position_source": {
        "type": "keyword"
      },
      "request_time": {
        "type": "long"
      },
      "spi": {
        "type": "boolean"
      },
      "squawk": {
        "type": "long"
      },
      "time_position": {
        "type": "long"
      },
      "true_track": {
        "type": "float"
      },
      "velocity": {
        "type": "float"
      },
      "vertical_rate": {
        "type": "float"
      }
    }
  }
}

重新运行 Logstash:

sudo ./bin/logstash -f fligths_logstash.conf

我们在 Kibana 中检查 flights 的 mapping:

GET flights/_mapping

我们可以看到一些新增加的各个新字段:

Observability:使用 Elastic Stack 分析地理空间数据

我们可以通过 search:

  "_source" : {
          "aircraft" : {
            "owner" : "Wells Fargo Trust Co Na Trustee",
            "reg_until" : "2021-04-30",
            "modes" : false,
            "built" : "1984-01-01",
            "acars" : false,
            "manufacturer_icao" : "BOEING",
            "serial_number" : "23018",
            "manufacturer_name" : "Boeing",
            "icao_aircraft_type" : "L2J",
            "operator_callsign" : "GIANT",
            "operator_icao" : "GTI",
            "engines" : "GE CF6-80 SERIES",
            "icao" : "a8a763",
            "registration" : "N657GT",
            "model" : "767-281",
            "type_code" : "B762",
            "adsb" : false
          },
          "true_track" : 272.81,
          "velocity" : 5.14,
          "spi" : false,
          "origin_country" : "United States",
          "@timestamp" : "2020-06-04T10:41:00.558Z",
          "request_time" : 1591267250,
          "time_position" : 1591267168,
          "last_contact" : 1591267168,
          "callsign" : "GTI165",
          "icao" : "a8a763",
          "location" : "39.0446,-84.6505",
          "on_ground" : true,
          "position_source" : "ADS-B"
        }
      }

我们可看到一个叫做 aircraft 的字段,它含有这个飞机所有被丰富的信息。

运用 Kibana 分析数据

找出前10的飞机型号

因为有新的字段进来,所以我们必须重新创建新的 inde pattern:
Observability:使用 Elastic Stack 分析地理空间数据

我们可以看到最多的是 PC-12/47E 这个机型。

找出飞机制造商的分布

Observability:使用 Elastic Stack 分析地理空间数据

我们看到 BOING 公司的市场份额是最大的。AIRBUS 处于第二的位置。

飞机机龄分布

Observability:使用 Elastic Stack 分析地理空间数据

我们可以看出来最多的飞机是2019年生产的。

飞机机型和飞行高度的关系

Observability:使用 Elastic Stack 分析地理空间数据

可以看出来 A320-214 飞机飞的是最高的。

Graph

运用 Graph 来找出数据直接的关系。如果你对 Graph 还不是很了解的话,请参阅我之前的教程 “Elastic Graph 介绍”。

Observability:使用 Elastic Stack 分析地理空间数据

点击 Create graph:
Observability:使用 Elastic Stack 分析地理空间数据

点击 Select a data source:

Observability:使用 Elastic Stack 分析地理空间数据

选择 flights* :
Observability:使用 Elastic Stack 分析地理空间数据

点击 Add fields:
Observability:使用 Elastic Stack 分析地理空间数据

添加 fields:
Observability:使用 Elastic Stack 分析地理空间数据

我们需要保持这个 graph。然后进行搜索:
Observability:使用 Elastic Stack 分析地理空间数据

从上面,我们可看出来 BOING 和我们想要的各个字段之间的关系。

我们从收集的数据可以有更多的其它的分析。在这里,我就不一一枚举了。你们可以做任何你想要的分析。

上一篇:Tablestore入门手册-数据管理-GetRow


下一篇:客服是人工智能落地的黄金场景(智能服务圆桌现场实录)