2018 Elastic Meetup南京交流会,由赵伟带来以“ElasticLog with ES in CloudEdge”为题的演讲。本文首先介绍了CloudEdge与ElasticLog是什么,其次介绍了产品的构架图以及ES的作用,最后介绍了ES在实践过程中需要设计Index、分配Shard、快速将Spark里数据写入ES中和数据去重。本文提出的产品已经解决了一些瓶颈问题,但还有一些问题仍在路上。
数十款阿里云产品限时折扣中,赶快点击这里,领劵开始云上实践吧!
直播视频回顾
以下内容根据现场分享整理而成。
CloudEdge与ElasticLog
CloudEdge
CloudEdge是一款自注册、自配置、自充值的网关产品,它可以分为两部分,一部分是硬件,另一部分是盒子。CloudEdge通常部署在客户网络的进出口,所有的流量都会经过这个盒子进行安全扫描,并做相应的流量控制。
CloudEdge产品主要面临着三方面的问题,第一方面是如何快速处理、快速存储大量数据量的问题,以供用户去查询。第二方面是要满足用户想要查某个时间段的数据不需查整段数据的需求,研究人员应当怎样做。第三方面是由于产品面对的是日本客户,日本客户比较谨慎,他们追求数据一条不能丢,一条不能多的精确度,研究人员应当怎样做。
CloudEdge
CloudEdge对流量进行扫描并进行控制后,会产生大量数据,例如网络访问相关的数据等。Project ElasticLog是负责将不安全的数据进行拦截,接着将所有的数据按照一定的格式写到软件中,让用户能够看到自己所需要关注的信息。最终通过盒子产生数据,Project ElasticLog负责将不安全的数据进行拦截后,做出跟业务相关、数据相关的scalable 系统。
架构图
目前此架构已经基本实现,但在部署上依旧存在一些问题。在上图的架构中,蓝色部分是数据的走向,直到数据落地,绿色部分负责数据落地后用户查询数据。在上图中,首先,Client负责将数据聚合后上传到Log receiververs上,其中Log receiververs是部署在AWS上的。其次,Log receiververs将数据上传到Kinesis stream上,其中Kinesis stream是负责托管服务的。再次,Kinesis stream将数据上传到经典的生产消费者模式区域。最后,数据传输到下游,下游中一部分负责把数据源源不断的写进ES里去,另一部分负责把数据做相应的解析后写进S3里去,其中S3是负责存储服务的。
经过以上处理后,理论上解决了问题,但实际上还需要进一步的延伸。第一,对ES里面的数据进行聚合,即数据进去是以分钟单位计算的,出来时需要聚合成小时单位;第二,将经常查询的数据放到ES里面,剩下的数据放到S3里面;第三,对放到S3内部的数据进行查询,目前对放到S3内部的数据进行查询的方法是用AWS提供的Athena服务,并用托管的方式去搜索S3上的数据,最后提供一个ECS供用户去查询数据。
ES的作用以及现状
ES 在 ElasticLog中主要负责三件事情,第一,主要用来放一个月的热数据,即每分钟有130万条数据进入,必须提供的是秒级查询;第二,ES应建立在AWS上,需要三个专用主节点与十个数据节点,其中主节点指M4.Load,数据节点指C4.2xBug;第三,它最多需要考虑28个指标参数。
在ES服务中,WAS是首选的服务,因为WAS能够提供相对可靠的ES服务,并且是全托管的服务。它可以从一些集群的维护中解放出来,更多的集中在怎样用ES把业务实现。当ES服务达到一定临界值时,让WAS去管理ES服务。另外,在ES服务中,WAS具有区域意识、易于使用、VPC+安全组、蓝/绿部署以及监视器+自动快照等特点,例如,区域意识指可以在不同的机房将数据进行备份。
ES实践
ES的实践包括四个部分,第一部分是怎样去设计一个Index,使它能够很大程度上影响到class的走向;第二部分是Shard的分配,即在指标里面放多少个Shard是合适的;第三部分是快速的将Spark里的数据写到ES里去;第四部分是数据的去重。四个部分的详细介绍如下:
设计Index
设计Index有两种形式,第一种形式是只设计一个Index,但整个程序建立起来后许多地方都不能进行修改了,只适用于小型数据;第二种形式是按时间去分割,即分割成每周一个Index或每月一个Index,能够有效地解决数据量的问题。
上图是ES实践中的一个设计案例,在这个案例的设计中需做一些考量的性能。第一,ES里面不能放两个doc,因为存放两个同名的doc需要字段类型一致,且多个doc类型已经不被支持,所以一个Index仅需存放一个doc类型即可;第二,如果客户只在乎查询结果,不考虑数据是什么,那么可以将source关掉,这样会有效地节约存储空间;第三,all的数据在做Index时能够做到将所有的数据聚合起来,如果只查找 某一段的数据则可把all关掉,这样不仅能够使空间的利用率得到有效地提升,还能使性能得到优化;第四,可以将dynamic改为strict。
在ES实践中,它是将分钟维度数据聚合成小时维度数据的,这就意味着将数据写进ES时是分钟Index,过一段时间后自动转换成小时维度Index。那么查询时它是怎样将分钟Index切换成小时Index的呢?原因是Index提供了一个aliases功能,这样数据一开始指向分钟Index,当分钟Index服务周期到了后,自动切换到小时Index,其中整个切换在数据的聚合里面去完成即可。
Shard的分配
那么在有了Index的基础上,怎样去合理的分配Shard的个数呢?另外,Shard是落在不同的集群上,又该怎样去选择合适的集群大小呢?具体介绍如下:
在设计好Index之后,进行读取数据量,并估计每周以至每分钟有多少数据量送进来。估计数据量的大小之后基本就能够确定Index有多大以及需要多少个Index,接着确定磁盘空间以及需要多少机器设备。进而算出Shard的个数,其中Shard的个数是数据节点个数的k倍,且建议每个Shard存储不得超过30GB。最后进行相应的测试、调整以及反馈,经过几轮测试后,基本就可以确定业务大概需要什么样的集群,这个集群需要多大了。
快速地将Spark里数据写到ES内
首先,通过一个Spark Streaming程序把数据源源不断地写进ES里去,接着,Spark调用ES-Hadoop库把数据写进ES当中。那么怎样快速地将Spark里的数据写到ES里去呢?除了调整一些ES参数以外,还需要将Spark进行并行计算和ES进行并行写入,使得并行task执行的个数等于集群总共拥有的vCores个数,最终两者对接实现资源的最大利用和快速地写入。
数据去重
在上文讲的架构中,由于Spark在做并行运算时会把任务分到每个task里面去做,如果一个task写入ES中失败了,那么Spark在task里面就会重新写入,从而导致数据出现重复。要想将重复进到ES的数据去重,可以从使用自定义唯一ID、使用聚合来发现复制和删除文件以及做明确的查询三个方面入手。