CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:吴斌
审稿人:李捷

发挥 Elastic Stack 在日志和实时数据分析计算领域的一些优势,对流媒体服务这样规模较大、实时性要求偏高,且分析、业务探索流程要求灵活的业务是一个比较百搭的选择。

数据逻辑架构如下

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

整体架构比较直观简单。我们省去了业务组建和存储层可能会用到的其他引擎,把目光主要集中在 Elasticsearch 上。

日志采集

日志的收集在正式进入数据管道前,可以选择落地或者直接吐到消息队列。这里采集的内容也主要分成 2 大部分

  • CDN/网络访问日志
  • 业务打点数据

业务打点的数据可以根据需求采集,实时部分主要会聚焦在,例如卡顿等这样的用户体验指标。网络访问的日志通常比较通用,这里我们也先给出一个例子,相信大家看上去会比较熟悉。

那么根据这里样例的数据,Elasticsearch 可以轻松的利用内置的 processor 和聚合功能做快速的分析,后面我们会举例说明。

{
    "receiveTimestamp": "2021-04-28T14:30:17.90993285Z",
    "spanId": "blahblah",
    "trace": "blah/f5c7578feaf277dd9a8d96",
    "@timestamp": "2021-04-28T14:30:17.549287Z",
    "logName": "logs/requests",
    "jsonPayload": {
      "@type": "loadbalancing.type.LoadBalancerLogEntry",
      "latencySeconds": "0.001749s",
      "statusDetails": "response_from_cache",
      "cacheIdCityCode": "ABC",
      "cacheId": "ABC-abcabc123"
    },
    "httpRequest": {
      "remoteIp": "10.0.0.1",
      "remoteIpIsp": {
        "ip": "10.0.0.2",
        "organization_name": "China Telecom",
        "asn": 8346,
        "network": "10.0.0.0/15"
      },
      "requestMethod": "GET",
      "responseSize": "125621",
      "userAgent": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
      "frontendSrtt": 0.124,
      "cacheLookup": true,
      "geo": {
        "continent_name": "Asia",
        "country_iso_code": "CN",
        "country_name": "China",
        "location": {
          "lon": 123,
          "lat": 321
        }
      },
      "backendLatency": 0.001749,
      "requestUrl": "http://bindiego.com/vid/bindigo.m4s",
      "requestDomain": "bindiego.com",
      "cacheHit": true,
      "requestSize": "671",
      "requestProtocol": "http",
      "user_agent": {
        "original": "Mozilla/5.0 (Linux; Android 10) Bindiego/7.1-1840",
        "os": {
          "name": "Android",
          "version": "10",
          "full": "Android 10"
        },
        "name": "Android",
        "device": {
          "name": "Generic Smartphone"
        },
        "version": "10"
      },
      "status": 200,
      "resourceType": "m4s"
    }
  }

这里就是最终导入到 Elasticsearch 里可分析的网络性能数据。针对这个数据,我们分别对它经过的管道和处理做一个简单快速的剖析。

消息队列

日志采集器会直接把数据打到消息队列,这里主要起到一个抗反压缓冲的作用。有些队列有很多附加的功能,例如存储和窗口计算,这里我们只使用最单纯的功能。因为后面我们选取了分布式计算引擎来做这这部分。

分布式计算引擎

分布式计算引擎,其实在整体实时数据分析业务里,扮演的着实是非常重要的角色。例如实时指标的窗口计算,迟报数据的修正等等。但在我们这个简单的场景下为了后面在 Elasticsearch 内更方便快捷的分析、过滤数据。

我们这里主要做了 ETL 和补全。例如把请求资源的域和资源类型提取出来,还有 CDN 缓存节点的区域代码等等。但是例如 IP 地址地理位置、用户设备类型和运营商(ISP)的反查,方便起见,我们利用了 Elasticsearch Ingest 节点预置的 Pipeline 去做。

这里要注意的就是,如果你的集群配置是全角色的节点,会对数据节点的性能有影响。建议使用独立的 ingest node 去做,且如果是在 K8S 上部署的话,还可以弹性扩容这组 nodeSet。

下面是 Ingest 节点配置举例

完整代码戳这里:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-pipeline.json
{
    "description": "IP & user agent lookup",
    "processors": [
      {
        "user_agent" : {
          "field" : "httpRequest.userAgent",
          "target_field" : "httpRequest.user_agent",
          "ignore_missing": true
        }
      },
      {
        "geoip" : {
          "field" : "httpRequest.remoteIp",
          "target_field" : "httpRequest.geo",
          "ignore_missing": true
        }
      },
      {
        "geoip" : {
          "field" : "httpRequest.remoteIp",
          "target_field" : "httpRequest.remoteIpIsp",
          "database_file" : "GeoLite2-ASN.mmdb",
          "ignore_missing": true
        }
      }
    ]
}

数据安全

数据安全这块顺带提一下,现在 Elasticsearch 的认证、授权都可以在 Basic License 里使用了,非常方便。这里简单提一下通讯这块,很多小伙伴用的是自签的证书。这个问题不大,经常被问到在使用 RestClient 开发的时候如何绕过去(例如在写计算引擎最后入库的时候)。其实方法也很简单,这里就给大家上个代码片段说明看下.

配置:https://github.com/elasticsearch-cn/elastic-on-gke#option-2-regional-tcp-lb

完整代码:https://github.com/cloudymoma/raycom/blob/streaming/src/main/java/bindiego/io/ElasticsearchIO.java#L273-L296

try {
    SSLContext context = SSLContext.getInstance("TLS");

    context.init(null, new TrustManager[] {
        new X509TrustManager() {
            public void checkClientTrusted(X509Certificate[] chain, String authType) {}

            public void checkServerTrusted(X509Certificate[] chain, String authType) {}

            public X509Certificate[] getAcceptedIssuers() { return null; }
        }
    }, null);

    httpAsyncClientBuilder.setSSLContext(context)
        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (NoSuchAlgorithmException ex) {
    logger.error("Error when setup dummy SSLContext", ex);
} catch (KeyManagementException ex) {
    logger.error("Error when setup dummy SSLContext", ex);
} catch (Exception ex) {
    logger.error("Error when setup dummy SSLContext", ex);
}

索引管理

索引生命周期管理 Elasticsearch 也提供了非常便利的工具。

生命周期配置,这里应该根据业务需求和节点规模综合考量

{
    "policy": {
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_size": "20GB",
                        "max_docs": 20000000,
                        "max_age": "7d"
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

索引模版

完整版:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/index-gclb-template.json

模版为每次生成的索引应用相同的配置,且指定了生命周期的政策文件和注入别名。

{
    "index_patterns": [
        "bindiego*"
    ],
    "order": 999,
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1,
        "final_pipeline": "bindiego",
        "index.lifecycle.name": "bindiego-policy",
        "index.lifecycle.rollover_alias": "bindiego-ingest"
    },
    "mappings": {

最后我们配置了脚本一次性把上述配置应用,且在 Kibana 里为我们建立好查询的 index pattern

详细戳这里:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/init.sh

数据面板

这里虽然是个人弱项,但是借助 Kibana 强大的可视化功能,可以根据第一部分整理出来的数据绘制实时面板。

完整可复用面板:https://github.com/cloudymoma/raycom/blob/gcp-lb-log/scripts/elastic/gclb_dashboard.ndjson

部分截图:https://github.com/cloudymoma/raycom/tree/gcp-lb-log#dashboards-in-kibana

下面我举一些可能常被忽视的好用功能给大家打个样。

  1. IP 反查出的 Geo 和 ISP 信息

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

通过这些信息,可以快速反映出各个运营商网络的情况,甚至一些盗链的线索初判断。

  1. Vega 在 Kibana 里绘制数据

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

当我们觉得 Kibana 自身图表不够丰富的时候,可以借助 Vega。上面这个图就展示了来自不同地区的用户,分别命中 CDN 缓存点的流量分配。数据通过用 Elasticsearch 的 Composite Aggregation 提取。

  1. Kibana TSVB

这个是我个人最喜欢的绘图方法了,可以非常灵活的对指标进行计算。下面这两个图表就展示过滤出直播业务的缓存命中、请求返回和缓存填充的数据量这些信息。

CDN 流媒体服务实时分析 Elasticsearch 实践—Elastic Stack 实战手册

总结

由于业务数据的敏感性,这里就不列举细节了。但数据管道和治理,都依旧遵循同样的原则。整体数据管道的选型也非常灵活,采集部分即可以是 Beats 生态中的产品,也可以是自己开发的 agent。队列常用的有 Kafka 或者云上托管服务。分布式计算层因为业务比较简单,我比较推荐使用 Apache Beam,这样执行引擎可以在比如 Flink、Spark Streaming 和任何 Beam 支持的平台上相对灵活的切换。

今天我们给出的案例是一个非常简单,且可以快速复用的开源项目。

大家有任何需求和疑问也欢迎到社区一起交流、学习。

创作人简介:
吴斌,Elastic 中文社区副主席,现就职于大型互联网公司任职云架构师。专注于海量数
据处理、挖掘、分析和企业级搜索领域。十分熟悉分布式应用,高可用架构和自动化技
术。曾在海外世界百强大学计算机学院任教 6 年。更是一位开源软件社区的积极贡献者
和组织者。
博客:https://gist.github.com/bindiego
上一篇:行在说 | 从阿里巴巴大数据之路看企业中台战略


下一篇:Serverless 工作流适用场景及最佳实践