ElasticSearch+Spark 构建高相关性搜索服务&千人千面推荐系统

ElasticSearch(下简称ES)是基于Lucene的一个开源搜索引擎产品。Lucene是java编写的一套开源文档检索的基础库,包括词、文档、域、倒排索引、段、相关性得分等基本功能,而ES则是使用了这些库,搭建的一个可以直接拿来使用的搜索引擎产品。直观地理解,Lucene提供汽车零部件,而ES直接卖车。

说起ES的诞生,也是个很有意思的故事。ES的作者Shay Banon——“几年前他还是一个待业工程师,跟随自己的新婚妻子来到伦敦。妻子想在伦敦学习做一名厨师,而自己则想为妻子开发一个方便搜索菜谱的应用,所以才接触到Lucene。直接使用Lucene构建搜索有很多问题,包含大量重复性的工作,所以Shay便在Lucene的基础上不断地进行抽象,让Java程序嵌入搜索变得更容易,经过一段时间的打磨便诞生了他的第一个开源作品Compass,中文即’指南针’的意思。之后,Shay找到了一份面对高性能分布式开发环境的新工作,在工作中他渐渐发现越来越需要一个易用的、高性能、实时、分布式搜索服务,于是他决定重写Compass,将它从一个库打造成了一个独立的server,并将其改名为Elasticsearch。“

 

可见鼓捣起来的程序员是多么有爱,虽然据说Shay Banon承诺给妻子的菜谱搜索还没问世……

本文大概地介绍了ES的原理,以及Wetest在使用ES中的一些经验总结。因为ES本身涉及的功能和知识点非常广泛,所以这里重点挑出了实际项目中可能会用到,也可能会踩坑的一些关键点进行了阐述。

重要概念
集群(Cluster): ES是一个分布式的搜索引擎,一般由多台物理机组成。这些物理机,通过配置一个相同的cluster name,互相发现,把自己组织成一个集群。

节点(Node):同一个集群中的一个 Elasticearch主机。

主分片(Primary shard):索引(下文介绍)的一个物理子集。同一个索引在物理上可以切多个分片,分布到不同的节点上。分片的实现是Lucene 中的索引。

注意:ES中一个索引的分片个数是建立索引时就要指定的,建立后不可再改变。所以开始建一个索引时,就要预计数据规模,将分片的个数分配在一个合理的范围。

副本分片(Replica shard):每个主分片可以有一个或者多个副本,个数是用户自己配置的。ES会尽量将同一索引的不同分片分布到不同的节点上,提高容错性。对一个索引,只要不是所有shards所在的机器都挂了,就还能用。主、副本、节点的概念如下图:

 

 


索引(Index):逻辑概念,一个可检索的文档对象的集合。类似与DB中的database概念。同一个集群中可建立多个索引。比如,生产环境常见的一种方法,对每个月产生的数据建索引,以保证单个索引的量级可控。索引->类型->文档,ES中的文档以这样的逻辑关系组织了起来。

类型(Type):索引的下一级概念,大概相当于数据库中的table。同一个索引里可以包含多个 Type。 个人感觉在实际使用中type这一级常常用的不多,直接就在一个索引中建一个type,在这个type下去建立文档集合和进行搜索了。

文档(Document):即搜索引擎中的文档概念,也是ES中一个可以被检索的基本单位,相当于数据库中的row,一条记录。

字段(Field):相当于数据库中的column。ES中,每个文档,其实是以json形式存储的。而一个文档可以被视为多个字段的集合。比如一篇文章,可能包括了主题、摘要、正文、作者、时间等信息,每个信息都是一个字段,最后被整合成一个json串,落地到磁盘。

映射(Mapping):

相当于数据库中的schema,用来约束字段的类型,不过 Elasticsearch 的 mapping 可以不显示地指定、自动根据文档数据创建。

Elasticsearch很友好地提供了RestFul的API,可以通过HTTP请求直接完成所有操作。比如下面官方的一个例子,往索引twitter添加文档,type是tweet,文档的id是1:

 

相应地,根据user字段检索文档:


关键配置项
1、索引的shards个数: 
shards的个数,最好是和节点数相关的。理论上对同一个索引,单机上的shards个数最好不要超过两个,这样每个查询尽可能并行。但因为ES中shards的个数是确定了就没办法再调整的,所以如果考虑到数据会高速增长,一开始分配多些也可以。另一个常见思路是按时间纬度(如月)去定义ES索引——因为可以动态调整新加的索引的shards个数。其他的一些情况,比如下面举到的Wetest聚合的例子,因为需要数据尽量地按照渠道切分开,所以定义了很多个shards(200个),但太多的shards通常是不推荐的,ES管理起来也有开销。

2、heap内存: 
官方建议是可用内存的一半,是通过启动ES的环境中,定义环境变量的方式完成的。如export ES_HEAP_SIZE=10g

3、cluster.name: 
集群的逻辑名称。只有cluster name相同的机器,才会在逻辑上组成一个集群。比如,内网中有5台ES机器的实例,是可以构成几个互不干扰的ES集群的。

4、discovery.zen.minimum_master_nodes: 
这个是用于集群的分布式决策的最少master机器个数。和常见的分布式协调算法一样,为了避免脑裂现象,建议超过一半的机器,n/2+1

5、discovery.zen.ping.unicast.hosts: 
ES集群的机器列表。注意ES单点不用配置集群中的所有机器列表,像一个连通图一样,只要每台机器配置了其他机器,而这些配置又是互相可以连接的,那ES最终就会发现所有机器,构成集群。如[‘111.111.111.0’,’111.111.111.1’,’111.111.111.2’]

mapping
mapping类似于数据库里的表结构,定义个mapping就意味着创建了一个索引。与数据库不同的是,一个索引并不需要显示地建立mapping,比如,上面那个在twitter索引插入文档数据的例子,如果执行的时候还没有定义索引,ES便会根据文档的字段和内容,自动创建索引和mapping。然而,这样创建的索引字段,往往可能不是我们所需要的。所以,还是自己预先通过手动定义mapping来创建索引比较好。下面是创建mapping的例子,这个例子在my_index这个目录下,为user、blogpost这些type创建了mapping。其中properties下面是各种字段的定义,包括了string、数值、日期等类型的定义。

如图中的红框部分,这个例子中有两个需要注意的地方:

1、user_id是string类型的,但它的index被定义为了“not_analzyed”,这个需要搞清其中的意义:通常,搜索引擎中全文检索的功能简单说是这样实现的:对原始文档进行分词后用这些词去建立倒排索引,在线上检索时,再将用户的查询词进行分词,用分词结果去拉取多个倒排索引的拉链结果、归并、相关性排序等,得到最终结果。但是,对于有些string类型的字段,其实并不想建倒排,就只想精确匹配,比如用户的名字,只想查到name字段精确为“张三”的人,而不是分词后得到的“张四”和“李三”两个人,这个时候,就需要定义index类型字段。这个字段有no、analyzed、not_analyzed三种类型,no是压根儿不给这字段建索引,analyzed是分析和按全文检索的方式建,not_analyzed是完全匹配的关键词查询方式。

2、date类型,创建mapping时需要通过“format”指定录入的多种可能时间格式。这样创建文档的时候,ES会根据输入文档的字段自动去确定是哪一种。不过直观地想象下,在创建文档时,指定明确的时间格式,省去ES动态判断的开销,应该会提升些微小的性能。此外,要注意,epoch_second(秒单位时间戳)和epoch_millis(毫秒单位)尽量不要混用,如果非要混用也要在插入的时候明确指明是哪个。曾经踩过坑,插入epoch_second的是秒级时间戳,但ES优先认为是毫秒,导致时间被缩小1000倍,最近的时间变成了1970年当年的某个时间。


下图列出了ES当前版本中可以进行mapping的数据类型、内置的字段、mapping操作可以携带的参数。因为篇幅原因这里就不详细解释了:

 


这里要详细介绍的,是上图中红框标出的,我们创建mapping时实际用到的比较关键的两个内置类型,和两个mapping参数。这几个都会直接影响最后索引访问的性能:

 

 

1)_source: es会把所有字段拼成一个原始的json落入磁盘,所以这个可以理解为全量原始数据,他不能用来索引,却可以在需要的时候返回。注意尽量不要禁用,比如禁用后,用script去update就不支持了。

2)_all:一个“伪”字段,用来实现模糊的全文索引。可以这样理解:在建索引的时候,把所有字段拼成一个字符串,然后对这个“大”字段进行切词,建倒排,然后这个字段就被丢弃了,没有真正落入磁盘。当全文检索时,如果没有指明查询的域,比如标题、正文(这种是很常见的),就从这个大的倒排中拉取文档拉链。可以想象,一些标记或值类型的字段,如日期、得分,这种在全文检索时是没意义的,就可以不包含在_all内,而文本域,如title、doc,就包含在_all之中。这些都是在建mapping时可以、而且最好指定的。

3)doc_values: doc_values和下面的field_data都是在聚合(后面会介绍)、排序这些统计时用的参数,默认都是开启的。排序、聚合,这种在文档全局进行的工作,用倒排索引肯定不合适。所以,对not_analyzed(即不建倒排)的字段,doc_values用一种列模式的方式(可以参考hbase)来存储文档的正排,方便在文档全局做统计。doc_values是存储在磁盘的,如果你明确有些字段只是展示,不用于统计的话,可以把这个禁用掉。Doc_values一定不会对analyzed域建索引(都切词了,想想也不合适,怎么建列索引嘛),而是用下面的field data。

4)field_data:对analyzed的文本域,比如正文,其实也会有统计的需求(比如ES也支持按一些关键词对文档进行聚合统计,但这种任务常用的方法是通过离线工具,如hadoop或者单机的分析,做好了后推送到在线索引,直接在ES去算其实感觉有些奇怪)。虽然并不适合在搜索引擎中做,但你真的做了,es也会把这个数据动态地load内存的一个field data中进行运算。所以,想想就知道,这是个非常耗内存的操作,很可能把jvm heap吃完了!!es默认是只打开,但不load,只是在你需要进行analyzed域的排序和聚合的时候,才去动态load这个内存(lazy的方式)。所以,尽量不要在查询的时候去打开这个潘多拉魔盒,或者干脆就把这个选项关掉吧。

聚合
谁说搜索引擎只能用来搜索?ES不仅能搜索,还能在搜索的结果集合上直接进行统计,很强大吧。ES目前稳定的非实验阶段聚合主要分两种:Metrics Aggregation(指标聚合)和Bucket Aggregation(桶聚合)。

指标聚合主要指常规的集合数学统计类运算,如官方guide的这个例子:找到交易的所有红色的车,然后求它们的平均价格:

结果大概是这样的:

ElasticSearch+Spark  构建高相关性搜索服务&千人千面推荐系统

ElasticSearch+Spark  构建高相关性搜索服务&千人千面推荐系统

神奇吧~指标运算还包括其他,如最大、最小、求和、个数、地理坐标运算等。然而我们今天要进行实例讲解的则主要是Bucket Aggregation,桶聚合。桶聚合是指把文档,按照某个给定字段分成不同的组,然后在组内进行进一步聚合运算,并返回桶级的结果。比较直观的理解,如:直方图、分时间段统计等等。如下面这个例子,是桶聚合中的term聚合,即按照color这个字段,精确匹配后进行分桶,然后桶内还进一步嵌套了平均价格聚合、和按制造商进一步的分桶聚合。

统计的结果类似下面这样,红色的车共有4辆,平均价格是32500,并且又包含了3辆本田和1辆宝马:

上面是简单的例子。在我们的WeTest舆情中,有论坛热帖这样一个功能,即,实时统计某个数据源中(如百度贴吧),某个论坛里(如王者荣耀吧),一段时间内(如3个月),回复数最多的TopN个帖子。

这个功能现在在线上的实现方法就不详细介绍了,大致是从数据库和Hbase中扫描对应的数据,维持一个堆,获取出TOP N的思路。一方面是稍微有些耗时,另一方面是请求量很大时可能对DB和Hbase的访问带来压力,所以也想找一种备选的方案,我们想到了用ES。

为了用ES的桶聚合,我们首先设计如何存储文档(即所有用户评论)的方案。由于数据量非常大(十亿级),所以我们首先想到了把文档按时间分成不同的索引(如按月),然后在指定月份(如3个月)的索引上,聚合出评论最多的Top帖子。然而这样是有问题的:当在多个ES索引上聚合时,ES不会把所有索引的结果放在一起聚合TopN,而是单独在每个索引求得TopN后,再放在一起聚合。这是个使用时要注意的小坑。这样导致的结果是,直接在多个索引上聚合出的TopN,并不是真正的TopN(比如3个月中,每个月都是不是Top 1,但三个月加起来就是Top了 1。局部最优不等于全局最优)。

所以,从时间上切分,这条路基本被堵死了。那只能从空间上切分了(您问能不能不切分?十亿级的数据量,上百个GB,不切分的话,乖乖,每次都要从这几百GB的文件里找东西,想想也知道有多慢了…)。从空间切分,同样需要考虑两个问题:1)如何将数据hash到shards。2)切分多少个shards。对于第一个问题,因为我们的聚合统计是在每个渠道(可以理解为论坛)下的,不会跨渠道,所以,按照渠道ID进行shards分配,把相同论坛的数据hash到一个shard即可。这样,每次请求某个渠道的聚合结果,把请求按渠道ID routing到对应的shard去运算。对于第二个问题,要看具体的规模了。我们的数据量有上百G,数据源上千个,所以我们希望每个shard上的内容尽量少,保证在单个shard上聚合的时候会更快,当然shards个数又不能太多,否则会给ES引入非常大的管理开销。综合下来,我们选择的shards个数是200个。

遗憾的是,ES只能根据你指定的key(论坛ID)去做hash后进行路由,这就导致了不同的shards上数据不是完全平均的,最多的能超过10GB,最少的只有几十MB。如果哪一天,ES如果开放自定义routing规则或者对shards数据进行均衡的方法,那就好了。

ES经常为人诟病的一个地方是建索引比较慢,10亿数据的索引构建时间要花几天。这也容易理解,天下没有免费的午餐,读写的性能往往是互斥的,快速读取和检索意味着大量索引和辅助数据的预先建立,那写入时势必会慢。如何取舍,需要看实际的业务场景而定了。下面就是建好索引后,去聚合某论坛内指定时间段内Top帖子的接口调用方式。

 然后,我们按连续统计最热的TopN(N为不同的个数)个渠道内的Top30热帖结果的方式分别对ES和线上已有的服务进行了测试:

上面的五个结果图直观地反应了用现在Wetest舆情线上的常规统计方式和ES聚合统计的方式获取结果的耗时。

从结果中,我们大概推断出了ES统计聚合运算的做法:先把所有符合过滤条件的数据全部检索出来,然后在内存中进行排序和聚合运算。也就是说,符合条件的数据量级越大,聚合运算越慢。本着这个原则,结果图也就比较好理解了:

1)在连续对最热的Top1000个渠道去进行热帖聚合时,ES的表现大部分都优于现有实现。这是因为Top1000的渠道中,大部分渠道被分在了非常小的shards上,有的只有几MB,数据量很小,在这样的shards中聚合,是很快的。

2)时间纬度上,统计3个月的数据,ES大部分情况下都比现有方法慢,而1个月或1天的情况下,ES都要快。这是因为3个月的条件下,符合条件的数据量级增大(最大的一个话题下有3万跟帖),ES的运算效率下降比较厉害。

3)从Top1000到Top10,ES的总时间逐渐变差于现有方法。这是因为,空间纬度上,Top10渠道符合条件的数据量级非常大,所以ES的运算效率下降比较厉害。

做了这个实验后,ES在WeTest头部数据源上的聚合速度并不比现在快,但在中部和长尾上的效果更优,这说明ES的聚合受候选集数据量的影响非常大,所以是否切换这种方式也还没最终决定。不过,这个实验证明了ES聚合的强大能力,至少,不用自己写什么代码,只通过接口调用就能把这样海量数据的统计运算完成了,还是很方便的一件事情,同时性能也不错。如果自行实现的统计运算中会增大DB的压力,那么通过ES聚合分离这部分请求,也是一个非常好的选择。

上一篇:c#-BLToolkit中的字段映射到类类型的属性


下一篇:春季春季安全瓷砖.如何使用磁贴页面定义设置表单登录?