· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》
创作人:田雪松
审稿人:李捷
业务背景介绍
当代商业组织面临的最基本挑战,是互联网已经不再是一个替代或可选渠道,它已经成为许多企业最主要的、甚至是惟一的销售平台。网上店面在现实中往往比实体店面还要重要,所以人们就必须要像监视实体店面一样,监控网上应用。
监控系统通常会以推送(Push)或拉取(Pull)的方式,从服务或应用中获取监控指标(Metrics),并在监控指标出现异常时,发送警报或恢复服务,以实现网上店面实时可用的目标。
监控系统在提升了应用可用性同时,还在日积月累中形成海量监控数据,而从这些监控数据中挖掘出巨大的商业价值。监控指标会包含一些特定业务场景下的业务数据,借助大数据分析工具对它们进行处理和建模后,就可以辅助人们作出正确的市场决策。
本章要介绍的案例就是基于 Elasticsearch 对监控数据进行商业挖掘的一次尝试,我们内部称这套系统为 Prophet(大预言家)。Prophet 通过机器学习对历史监控数据进行处理,并创建预测模型,最终可以实现对单次用户请求处理的全面预测,包括处理结果是否成功、处理执行时长等等。
Prophet预测系统架构
Prophet 虽然只是监控数据最终的分析处理服务,但支撑 Prophet 实现智能预测的整个系统却涵盖了四个主要部分:
- 第一部分是产生海量监控数据的监控系统,它从被监控的业务系统中拉取监控数据,并提供实时告警服务。
- 第二部分则是将监控数据,从监控系统中导入到 Elasticsearch 集群的数据系统,它还负责对监控数据进行清洗、转换;
- 第三部分是用于存储历史监控数据的 Elasticsearch 集群,我们借助 Elasticsearch 冷热模型,延长了监控数据的保存时间;
- 而最后一个才是最终处理这些数据的 Prophet 服务,它通过 Spark 运算集群,实现智能预测功能,未来还会添加更多数据分析功能。
四个系统之间的数据流动关系如图 1-1 所示:
图1-1 Prophet周边生态
除了 Elasticsearch 以外,HDFS 也是存储历史监控数据的一种不错选项,并且 HDFS 更容易与 Hadoop 的大数据生态系统集成。
我们之所以采用 Elasticsearch 主要还是被 Elasticsearch 强大的检索能力所吸引。同时,Elastic Stack 家族中的 Kibana 还提供了强大的数据可视化能力,至于数据分析则可以利用 elasticsearch-hadoop 插件整合 Hadoop 生态。
使用 Elasticsearch 存储监控数据
在 Prophet 诞生之前,我们的业务系统已经使用 Prometheus 做到了实时监控,同时还通过 Grafana 对监控数据做了可视化处理。
但 Prometheus 监控数据存储,在其内置 TSDB(Time Serie Database)中,Prometheus TSDB 中并没有分片和副本等概念,而只是简单地将数据保存在本地硬盘上。这意味着 Prometheus 无法支持数据高可用,同时也意味着 Prometheus TSDB 中的数据不能保存太久,否则本地硬盘迟早会被积累的数据撑爆。所以 Prometheus 默认只保存监控数据 15 天,超过这个时间后监控数据就会被直接删除。
对于一个监控系统来说这并不算什么大的问题,因为监控系统通常对实时数据更感兴趣,它只要能够实时快速地反映,被监控系统的异常就足够了。而历史数据分析和处理,并不能算是监控系统的职责范畴,应该交由 Elasticsearch 这样更专业的数据检索与分析工具来实现。
从另一个角度来说,由于预测系统,必须要基于历史数据创建预测模型,而如果直接在 Prometheus TSDB 上进行高频度的模型运算,则有可能会对监控系统本身造成性能上的影响。从系统监控与数据分析的职责角度来看,监控系统的稳定性无疑比数据分析更为重要。所以预测系统使用的历史数据,应该与 Prometheus 隔离开来,这就要求监控数据得从 Prometheus 中同步到 Elasticsearch 。一旦数据从 Prometheus 进入到 Elasticsearch ,我们就可以利用 Elasticsearch 冷热架构对数据进行优化以保存更长时间。
Prometheus 与 Elasticsearch 的数据同步
Prometheus 支持一种称为远程读写(Remote Read/Write)的功能,可以在 Prometheus 拉取监控数据时将数据写入到远程的数据源中,或者从远程数据源中读取监控数据做处理。
Prometheus 目前支持的远程数据源多达几十种,所有数据源都支持远程写入,但并不是所有数据源都同时支持远程读取,比如我们这里要使用的 Elasticsearch 就只支持远程写入。
注:7.10版本是支持的,可以通过 metricbeat 对 Prometheus 进行远程拉取,参见:[__https://www.elastic.co/guide/en/beats/metricbeat/current/metricbeat-metricset-prometheus-query.html__]
Prometheus 为远程读取和写入,分别定义了独立的通信与编码协议,第三方存储组件需要向Prometheus 提供兼容该协议的 HTTP 接口地址。但由于众多第三方存储组件接收数据的方式并不相同,支持 Prometheus 读写协议的接口地址并不一定存在。为此,Prometheus 采用了适配器的方式屏蔽不同存储组件之间的差异,
如图1-2所示:
图1-2 Prometheus 远程读写
只要提供了适配器,Prometheus 读写数据就可以转换成第三方存储组件支持的协议和编码。Elasticsearch 也需要使用适配器,这个适配器就是由 Elastic Stack 家族中的 Beat 组件担任。早期 Prometheus 官方推荐的适配器是 Infonova 开发的 Prometheusbeat,而在最新版本中已经转而推荐 Elastic Stack 官方的 Metricbeat。
无论是 Prometheusbeat 还是 Metricbeat,它们都可以向 Prometheus 开放 HTTP 监听地址,并且在接收到 Prometheus 监控数据后将它们存储到 Elasticsearch 中。Metricbeat 采用 Module 的形式开启面向 Prometheus 的 HTTP 接口,具体配置如示例 1.1 所示:
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"
示例1.1 Metricbeat 配置
添加示例所示配置内容后重启 Metricbeat,它就会在本机 9201端口开始监听 Prometheus 的写入数据。与此同时还要将这个监听地址告诉 Prometheus,这样它才知道在拉取到数据后向哪里写入。具体来说就是在 prometheus.yml中 添加如下内容:
remote_write:
- url: "https://localhost:9201/write"
示例1.2 Prometheus 配置
使用 MetricBeat 组件保存监控数据,监控指标的标签(Label)会转换为 Elasticsearch 的文档字段。Metricbeat 还会附加一些 Metricbeat 自身相关的数据,所以文档中的字段会比实际监控指标的标签要多一些。
除了指标名称和标签以外,还包括主机地址、操作系统等信息,这可能会导致样本数据量急剧膨胀。但 Prometheus 并不支持在写入远程存储前过滤样本,所以只能通过 Beat 组件处理。
[注:最高效的做法是在 metricbeat 上设置 drop_fields 的 processor,直接过滤,避免无效的网络传输]
比较理想的方法是将 Beat 组件的输出设置为 Logstash,然后再在 Logstash 中做数据过滤,最后再由 Logstash 转存到 Elasticsearch 中。
在我们的系统中,除了要过滤以上 Metricbeat 附加的字段以外,还需要要组合、修正一些业务相关的字段。所以为了更好的处理数据,我们在整个数据系统中加入了 Logstash 组件.
最终整个数据系统的大致结构如图 1-3 所示:
图1-3 数据系统
Prometheus 在拉取到监控指标时,将数据同时发送给 Metricbeat 或 Prometheusbeat,它们在这里职责就是一个监控数据接收的端点。
Metricbeat 或 Prometheusbeat 再将数据发送给 Logstash,而 Logstash 则负责清洗和整理监控数据,并将它们存储到 Elasticsearch 中。我们将整个监控系统和数据系统部署在 Kubernetes 集群中,而最终用于机器学习的数据源 Elasticsearch 则被部署为独立的集群。
这样就将监控系统与数据分析系统隔离开来,从而保证了监控系统,不会受到大量数据分析运算的影响而出现故障。实际上图 1-3 就是对图 1-1中 所展示的监控系统、数据系统以及 Elasticsearch 集群的细化,但在实际应用中还有许多细节没有展现出来,需要读者根据项目实际情况做适当调整。
集成 Elasticsearch 与 Spark
事实上,新版 Kibana 组件中已经具备了非常强大的机器学习能力,但这项功能还未开放在基础授权中。此外我们的业务系统还有一些自身的特殊性,所以最终我们决定采用 Spark 编写代码实现这套智能预测系统。
一种显而易见的办法是利用 Elasticsearch 客户端接口,先将数据从 Elasticsearch 中读取出来再传给 Spark 进行分析处理。但 Spark 数据处理是先将任务分解成子任务,再将它们分发到不同计算节点上并行处理,以此提升海量数据分析处理的速度。而任务分解的同时是数据也要分解,否则最终数据读取,就会成为所有子任务的瓶颈,这种分解后的数据在 Spark 中称为分区(Partition)。每个任务在各自的数据分区上运算处理,最终再将各自的处理结果按 Map/Reduce 的思想合并起来。
Elasticsearch 中的分片(Shard)刚好与 Spark 分区相契合,如果能让任务运算于分片之上,这无疑可以更充分地利用 Elasticsearch 存储特征。但如果是先从 Elasticsearch 读取数据再发送给Spark做处理,数据就是经由分片合并后的数据,Spark 在运算时就需要对数据再次进行分区。
Elasticsearch 包含一个开源的 elasticsearch-hadoop 项目,可以很好地解决数据分区问题。elasticsearch-hadoop 可以与包括 Spark 在内的 Hadoop 生态良好集成,Spark 分区数据也可以与 Elasticsearch 分片数据直接对应起来。我们所要做只是将 elasticsearch-hadoop 的 Spark 依赖添加到项目中,并使用其提供的接口将数据从 Elasticsearch 中读取进来即可。智能预测系统采用 Java 语言开发,所以使用 Maven 引入如下依赖:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-${spark.major.version}_${scala.version}</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
示例1.3 引入依赖
如示例 1.3 所示,elasticsearch-hadoop 的 Spark 依赖为 elasticsearch-spark。由于需要用到Spark ML 相关库,示例也将 spark-sql 和 spark-mllib 依赖也一并引入了进来。elasticsearch-spark 在 org.elasticsearch.spark.rdd.api.java
包中定义了类 JavaEsSpark,它提供了大量用于读取和写入 Elasticsearch 的静态方法。其中,静态方法 esRDD
用于从 Elasticsearch 中读取索引,而 saveToEs
则用于向 Elasticsearch 中存储数据。
示例展示了从 Elasticsearch 中读取索引数据的代码片段:
List<Row> jobs = esRDD(sparkContext, metricIndex, queryString).values().map(
map -> {
Object[] values = new Object[featureFields.length+1];
for (int i = 0; i < featureFields.length; i++) {
values[i] = map.get(featureFields[i]);
}
values[featureFields.length] = map.get(labelField);
return RowFactory.create(values);
}
).collect();
StructField[] structFields = new StructField[featureFields.length + 1];
for (int i = 0; i < featureFields.length; i++) {
structFields[i] = new StructField(featureFields[i], DataTypes.StringType, true, Metadata.empty());
}
structFields[featureFields.length] = new StructField(labelField, DataTypes.StringType, true, Metadata.empty());
StructType schema = new StructType(structFields);
Dataset<Row> dataset = sparkSession.createDataFrame(jobs, schema);
示例1.4 使用 Spark 读入数据
示例中,sparkContext
是 SparkContext
实例,代表了与 Spark 运算环境相关的一些基本信息。MetricIndex 则是 esRDD
方法要读取文档数据的索引名称,而 queryString 则是检索文档的查询条件。esRDD
返回的类型为 JavaPairRDD
,这是类似于一个 Pair 的集合。
每一个 Pair 代表索引中的一个文档,Pair 的 key 是字符串类型,代表了当前文档的标识符;而 value 则是一个 Map
类型,代表了整个文档数据。通过 JavaPairRDD
的 keys 方法将只返回所有文档的标识符,而通过 values 方法则会只返回所有文档的 Map 对象。
Map 的键为文档字段名称,而值则为文档字段对应的具体数值。示例中的代码就是通过调用 values 方法只获取文档的 Map 对象,然后再通过 map 和 collect 方法将它们转换成 Row 的集合。
分类与回归
机器学习本质上是基于统计学原理,构建数学模型的过程,而构建出来的模型又可以用于数据分析与数据预测。机器学习一般分为监督学习和无监督学习两大类,它们分别针对有标注(Label)数据和无标注数据构建数学模型。
标注可以认为是对数据在某一维度上的识别,比如让计算机识别声音或图像中的文字,就需要先对它们做标注,然后才可以根据标注对声音或图像构建数学模型。具体来说,人们先要提供一组与文字关联好的音频或图片,计算机再根据这些标注好的数据,找出从声音频率、图片像素到文字的映射关系。由于不同人对文字的发音和书写存在着巨大的差异,所以需要通过海量音频和图像进行统计分析,找出这种映射关系中最为本质的转换模型。所以监督学习的本质是学习输入到输出的统计学规律,只不过这些映射关系并不像加减法那样直观,需要借助计算机统计与分析才能完成。
无监督学习则是根据无标注数据进行数学建模的过程,它不存在输入与输出,本质上是学习数据中天然的统计规律或潜在结构。比如用户的消费行为就很难根据他们的年龄、性别、学历等特征做区分,而且由于涉及隐私这些数据也不一定会在系统中查到。换句话说,即使两个人的硬性特征完全相同,但在人生经历、思想认识等方面存在着的差异,依然可能使他们的消费行为不同。所以这时就只能通过用户消费行为产生的数据对他们进行聚类,相同聚类的用户往往有着相类似的消费习惯,聚类结果也就可以应用于商品推荐系统之中了。
本章介绍的智能预测系统是一种通过监督学习,构建预测模型的机器学习系统,本质上就是发现已知条件与未知结果之间的统计规律。我们之所以要建立这样一个系统,是因为在以往对业务系统日志的统计分析中发现,在某些特定条件下某些用户请求一定会失败。这让我们坚信在已知请求条件与未知运行结果之间,一定存在着某种映射关系,我们希望借由这样一个系统找到这种神秘的映射关系。
在监控系统的数据中原本就包含了请求执行结果,这个数据就可以作为整个请求与处理相关数据的标注。在监督学习中,作为输入的数据一般称为特征(Feature),而输出的结果则一般称为标注(Label)。在示例中定义的 featureFields 定义的就是索引上那些可以作为特征的字段,而 labelField 则是索引上可以作为标注的字段。示例中的代码就是将特征字段的值和标注字段的值从 Map 中取出,并将它们统一放置到一个数组中并转换为 Spark 运算需要使用的 DataFrame。
正如前面所介绍的那样,监督学习的本质是学习输入到输出的统计学规律。如果从输入到输出的产出结果是有限的,那么这时机器学习生成的模型称为分类器(Classifier),而它解决的问题就是分类问题。比如我们正在介绍智能预测系统,它通过已知的请求条件预测执行结果,输入是在请求执行前已知的请求条件,而输出则是该请求的执行结果。
执行结果只有两种结果,要么成功要么失败,所以计算机学习出来的数学模型就是一个分类器。除了分类问题以外,监督学习可以解决的另一种类问题是回归问题。与分类问题不同,回归问题输出的不是有限数量的结果,而是数值连续的结果,它更像我们在数学中学习到常规函数。
同样是以请求条件作为输入,如果预测的不是执行结果而是执行时间,那么这时要解决的就不再是分类问题而是回归问题。因为请求的执行时间并不是离散的,它可能是任意一个非负的数值。
Spark机器学习支持多种实现分类与回归问题的算法,包括逻辑回归、决策树、随机森林、梯度提升树等等。我们很难在一个章节中将上述算法完全讲解清楚,但所幸应用这些算法做预测也并不需要深入了解算法原理。本章仅以梯度提升树为例解决分类问题,也就是通过梯度提升树实现对请求执行结果的预测,读者可根据本章所列方法实现对请求执行时间的预测。
Spark 在实现机器学习时一般会将数据分成两组,一组用于构建数学模型,而另一组则用于验证已构建的模型。
示例中的代码就是将示例中生成的数据按 8:2 分成两组,8 成的数据用于生成模型,而 2 成的数据用于验证模型:
Dataset<Row>[] splits = featurized.randomSplit(new double[]{0.8, 0.2});
GBTClassifier gbt = new GBTClassifier()
.setLabelCol(LABEL_COL)
.setFeaturesCol(FEATURES_COL)
.setMaxIter(10);
GBTClassificationModel model = gbt.fit(splits[0]);
Dataset<Row> predictions = model.transform(splits[1]);
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(LABEL_COL)
.setPredictionCol(PREDICTION_COL)
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
log.info("Test Error = " + (1.0 - accuracy));
model.write().overwrite().save(path);
示例1.5 生成预测模型
如示例所示,GBTClassifier 就是基于 GBT 算法的分类器,通过它可以生成基于 GBT 算法的分类模型。setLabelCol 和 setFeaturesCol 分别设置了数据的标注和特征,而 setMaxIter 则设置了生成模型时迭代的次数。有了 GBT 分类器后,调用其 fit 方法并将8成的数据 splits[0] 传入就会触发模型创建的运算。
模型生成的运算通常比较慢,具体时间长度取决于参与运算的数据量大小,但一般都是分钟级别的。由于模型一旦生成往往可以复用,所以在数据变化不是特别剧烈的情况下,可以将模型保存到硬盘。这样在需要预测时就不用再通过匹配数据生成模型,通过硬盘中保存的现成模型数据就可迅速生成模型。所以在我们的实现中,会每天定时更新一次模型到硬盘,但在实例应用模型时,都是直接从硬盘中加载模型以节省运算时间。
示例展示了从硬盘中加载模型,并根据输入的特征向量进行预测的代码片段:
GBTClassificationModel model = GBTClassificationModel.load(path);
Dataset<Row> predictions = model.transform(featurized);
predictions.show(false);
示例1.6 实现预测
示例中使用的 tranform 方法在这里就是最终用于预测的方法,传入的 featurized 则是经过向量化的已知数据。
当然,以上示例代码中省略大量的代码细节,在实际应用中还有许多更为具体的问题要解决。根据我们应用的效果来看,预测结果的准确率可以达到 95% 以上。
回归类问题的处理过程与本章所介绍内容类似,感兴趣的读者可根据本章内容尝试实现对请求处理时间的预测。