分享人:弦望 解决方案架构师
元风 Flink产品专家
正文:本文从三方面来介绍互联网、游戏等行业实时数仓构建最佳实践。
Ÿ 最佳实践原理讲解
Ÿ 核心产品实时计算Flink讲解
Ÿ 最佳实践系统搭建
一、最佳实践原理讲解
1)场景描述
本方案的核心场景是互联网、游戏等行业客户,对业务数据的实时反馈尤为重要,比如游戏的实时战斗数据、跟AI结合的实时反馈数据等,往往会基于Flink构建实时数仓。本方案通过Logtail采集服务端日志,使用Flink实现对日志的拆分、处理,根据业务需求写入不同的数据存储,实时分析数据使用ADB存储,离线分析数据 使用HDFS存储,构建高效、稳健的实时数据服务。这也是行业内构建实时和离线数仓、实时离线分离最通用的方案之一。以下几个场景非常适合使用本方案:
Ÿ 日志的采集、处理及分析、日志投递到SLS日志服务
Ÿ 日志使用Flink拆分到不同Logstore,使用Flink对日志数据进行ETL。
Ÿ 日志使用Flink实时写入HDFS,ETL后写入离线数仓。
Ÿ 日志数据实时ETL构建实时数仓。
Ÿ 日志实时写入ADB进行实时查询分析等场景。
2)系统架构图
这是来自客户的一个架构,客户的业务日志存放在ECS上,并且多种日志是汇聚在一起的,如广告日志、自定义事件、交易日志、登录日志等,需要把不同的日志拆分到不同的Logstore,客户有80多种日志类型,如果通过Flink内置的connectore组件实现需要写80多组SQL,而且不好维护。为了解决这个痛点,本方案通过自定义Sink实现日志拆分,一组FlinkSQL,业务逻辑全部通过自定义代码完成。
客户的业务日志数据需要进行实时ETL,这里为了结合业务和维表的缓存能力,我们使用RDS、Hbase、ADB等维表实现日志实时ETL。
实时数据需要处理完后实时写入ADB,供业务系统做后续数据实时分析使用。
离线数据通过Flink写入EMR的HDFS。Flink组件未包含写入HDFS的组件并且开源的组件逻辑也不能满足业务需求,这里需要通过我肯定义Sink实现。
测试从日志落地到写入ADB的总体延时,即测试Flink在全链路处理数据的时效性验证。
3)方案优势
Ÿ 性能优越:Flink集群拥有高吞吐和高扩展性,是业内公认的实时计算最新一代的引擎。
Ÿ 安全稳定:支持Exactly-Once保障数据的唯一性和可恢复性,支持故障自动恢复,资源隔离。
Ÿ 简单易用:支持标准的SQL语言,可实时在线开发,全面支持UDX实现复杂的业务逻辑。
Ÿ 功能强大:支持SQL进行实时及离线数据清洗、,数据分析、数据同步、异构数据源计算等数据湖相关功能,以及各种流式及静态数据源关联查询。
4)实时计算产品架构
左侧数据源可以来自ECS、RDS及IOT等设备产生的数据,数据源通过消息队列如SLS、MQ、Datahub等产品进入实时计算引擎。在实时计算中对数据进行实时ETL写入下游数据存储,如Hologress、MaxCompute、Hbase、ES等产品。Flink支持 SQL和DataStream API编写满足简单和复杂应用场景,支持自定义connector,支持CEP Rule engine、Flink ML、Gelly等引擎库,同时支持Web IDE开发和强大的数据监控能力。
二、核心产品Flink讲解
1)实时计算-技术架构
在实时计算Flink板全托管模式中一共分成两个主要的组成部分:第一层是开发平台,第二层是执行引擎。
在开发平台中,我们提供Verveica Platform。Verveica是德国的Flink创始团队所成立的公司,阿里巴巴在2009年收购了这个公司,并在去上提供商业板产品,在执行引擎上我们提供开源版的Flink内核,在开源版内核的基础上提供远程Shuffle,包括高业版的状态存储,以及对SQL作业算子的深度优化。
在开发平台上面,我们做了可视化的SQL编辑器,提供SQL Debug功能,提供全链路的商业版Connectors,扩展积极学习库,并且在Operator级别做了状态管理,为大家提供AotuPilot智能高优,可以根据作业的规模来增减作业资源使用量,在云上提供Java与Python的作业支持,为大家提供99.9%的SLA企业级安全以及全链路控制。
2)实时计算与开源Flink对比
可以看到对比Flink,我们一共有这些主要的功能 ,在SQL、部署、优化、状态存储、资源管理、机器学习包括监控报警上,我们领先开源。我们产品主打的从开发到运行,主打的几个核心功能。
3)在线调试
首先是SQL作业的在线调度,对于流作业的开发SQL往往是我们的第一选择,但是SQL在开发的过程中,由于它语意与最后执行引擎是不统一,所以经常会造成代码与逻辑的混乱,所以在云上我们为大家提供给予SQL的调试能力,这样的话我们可以直接看到SQL的执行结果 。
4)Session Cluster
在执行层,我们为云上用户提供小规模的Session集群,在Session集群中,默认多个作业是共享相同资源,这样可以最大化地节省资源的使用,同时提升资源的利用率,并且对于小规模的作业,由于Session集群是提前创建完毕的,可以节省状态的下载时间。当作业负载达到一定程度时,可以手动将作业从Session集群迁到Per-Job集群里。
5)AutoPilot
分配算力时,我们尽可能的将计算能力与真实的流量进行匹配,我们就可以在流作业的峰谷对作业进行资源的扩充以及资源的消减,可以最大化地节省资源的利用,保证作业平衡运行。
三、最佳实践系统搭建
使用云架构设计工具CADT来拉起今天要演示的环境,搜索实时数仓,互联网游戏等行业实时数仓,基于模版新建。
在这里要对一些参数进行修改,改一下OSS配置Bucket的名称。
Ecs的密码需要设置一下。
日志服务Project的名称设置一下,名字要是唯一的。
Emr也要设置一下密码。
RDS数据库也设置一下密码。
保存。输入一个自定义的名称。
保存后开始应用部署。
资源校验成功了。
看一下部署需要的相关价格,这里列出来免费和后付费的价格,这里有详细的报告可以看。
勾选协议,创建资源。
可以看到资源已经成功创建了。
接下来创建数据库,在这里进入到rds的控制台。
进入rds控制台,这是我们的帐号。
创建数据库。
接下来设置白名单,目前已经在白名单里面了。
我们把emr、ecs的安全组添加到白名单。
进入adb控制台,创建帐号。
创建完成。
登陆数据库。
添加ems的白名单,再登陆就可以了。
登陆实例,创建数据库。
回到adb控制台,找到数据安全,添加白名单。
把vpc的网段也添加进来。
网段可以在CADT里看到。
接下来创建实时计算,我们这里Blink独享集群,目前我们推荐使用Flink全托管产品,现在已经把Blink的特性和开源版社区的能力融合在一起了,形成了现在的Flink全托管。今天的演示使用Blink独享集群来做。
我们master选1个就可以了,Slave选择3个,购买。
接下来创建集群。
OSS刚才也跟随创建了,这里选一下,VPC是VPC Flink,Zone选e可用区,创建。
接下来等待集群创建,现在集群已经创建好了,我们开始创建项目。
创建项目。
我们在日志服务台看一下这个项目。这是我们创建的Project。
这是我们创建的logstore。
这里选择sls的服务器。
执行安装成功,确认安装完毕。
输入名称。
看一下核心组,是成功了。
配置名称和日志路径。
手动输入日志样例,填写日志抽取内容。
上传原始日志,进入下一步。
我们的logstore就创建成功了。
从这里找hbase控制台。
修改分组白名单,把vpc的网段加进去。
我们所有的基础环境的配置就完成了。
这个是我们的公网IP。
登陆一下。
下载原始日志,首先进入目录,接下来创建目录。
使用命令去生成一个模拟的自主文件。
接下来登陆日志服务的控制台,看一下日志生成的情况,可以看到日志已经成功上传到日志服务了,说明我们的采集已经生效了。
接下来把日志拆分到不同的logstore,可以看到evenkey有很多种类型,原始日志是汇集到一个文件里面的,接下来把不同的日志文件分别拆分到不同的logstore里面。
我们通过给大家提供的源码来进行创建,大家可以导入这个源码去创建logstore,这里需要把AK替换成自己的,日志服务也替换一下,这里使用新创建的logstore。
接下来运行一下,可以看到成功了,可以看到预处理成了Create案例。
我们再回到日志服务的控制台,看一下创建完成了。
可以看到刚刚这些通过代码创建的logstore都已经创建了,这里需要注意给所有logstore开启索引。
接下来在rds控制台登陆数据库。
添加上白名单,登陆。
登陆成功后,先创建尾表,点poc的数据库,点执行,创建尾表,成功创建了。
接下来往尾表里传数据,刷新一下,执行可查询的数据,可以看到类型和logstore已经做了一一映射的关系。
接下来对源码进行编译,创建一个新的编译clean install,开始编译源码。
可以看到build成功,编译完成了。
回到Flink的项目,已经创建好了。
进入项目,进到Flink的开发,首先上传资源,新建资源。
上传一个刚刚打包出来的价包,选择这个价包。
上传。
上传完成之后,我们去新建一个作业,这个作业用于拆分到不同的logstore。
从Demo库里把Flink SQL拷贝过来,这时注意要修改一下AK的信息、project、logstore相关的信息。
替换一下rds尾表的地址,去拷贝rds的内网地址。
数据库是poc,用户名改一下,把输出的logstore也改一下,project也替换一下。
作业已经自动保存了,在这个作业里,我们使用了自定义的价包,资源还要去引用一下。
上线作业。
通过后,自动调优,上线。
上线后,进到运维的界面。
启动作业,时间选择较早的时间,保证当前是有数据的。
进入作业详情,看一下作业的拓扑图。
任务失败,排查一下原因,进入Jobmanager里面。
可以看到Jobmanager.log。
搜索一下看有没有错误,这个是连接失败了。
我们排查一下网络连接的问题,原来是刚才帐号输错了。
把原来任务停止,重新启动一下作业。
再进来看一下,现在数据能正常输入输出了。
刚刚也给大家介绍了如何在Flink里面如何排错。大家在Jobmanager里看到Flink日志的。刚才Jobmanager.log是认证失败的,排查数据库的帐号输错了,重新上线启动了作业,现在好了。
如果要看当前Flink集群的IP地址是什么,哪台机器在处理这个作业,可以在日志中心里看到IP。
可以在日志服务的控制台里看看,可以看到日志已经被拆分到不同的logstore里面了。这是根据rds尾表里的映射关系,把原始不同类型的日志拆分到不同类型的logstore。
接下来处理日志实时的etl,进入到ecs,我们把hbase的客户端部署到ecs上,这里在实验,为了节约资源的关系,直接把hbase客户端部署到日志上,生产的时候可以独立地去做部署,部署在这台机器上,开始安装。
安装完成后,去编辑下hbase的配置。
需要去hbase控制台,找到ZK连接地址,复制过去。
接下来进入到hbase客户端的命令端,用shell启动客户端。
启动之后创建一个hbase的尾表,比如创建一个item的尾表。
成功创建了,可以看到发表已经成功创建。
往表里插一组数据,可以看下数据是不是成功插入。
可以看到数据已经成功插入了。
接下来在hbase里添加Flink安全组。
确认一下Flink集群当前的安全组是哪一个,勾选添加。
创建logstore,用来去做数据etl之后的存储,在这里创建logstore,永久保存,创建。
在实时计算里面创建开发任务,新建作业。
再从Demo库里拷贝Flink SQL,这里面的信息也要替换一下。
拷贝数据库的连接地址。
把AK、project、logstore都要替换。
保存后上线。
我们再启动作业。
成功运行后,我们看一下拓扑图。
看一下有没有数据的产生。可以看到有数据输入和输出。
在日志服务控制台看一下,开启索引,如果查不到,可能是没有开索引。
重启下原来的作业。
重启后,可以查询到了。有些空的,是因为有些字段不完整。
把日志实时同步到adb里面。首先创建adb的结构表,进入在dms控制台,点poc,创建一个adb的表,已经成功创建了。
接下来在实时计算里面创建一个Flink SQL,去把刚刚处理完的日志数据往adb里面去写,新建一个作业。
拷贝一下Flink SQL,修改一些AK等信息,进行替换,logstore是一致的,都确认一下。
把adb的链接拷贝过来,看下用户名、密码。
上线。
进入运维,启动作业,选择较早的时间。
启动完后,看下数据的情况。
可以看到数据在输入和输出。
进到adb的控制台,查询下数据是否已经写入。
查看下数据延时的情况,这个表里面记录了原始日志的时间,可以分析从日志生产的时间到我们进行一系列上传、拆分,看下数据的平均延时是多少。
可以看到这个是刚才查出来的延时,可以测全链路,通过实时计算处理这个延时是多少。我们这个方案里还包含离线,我们把数据写到edf里面,这是一个实时的方案。
我们看一下emr的安全组是不是对Flink已经放行了,进入emr集群,看下emr的安全组,添加一下端口,全部放开,授权给实时计算的安全组,保存。
在实时计算里新建作业。
再把Flink SQL写进去。
把日志里的project信息替换一下,找到emr的host,可以在emr控制台找到,直接使用内网地址。
这个是自定义的sink。
这个是从日志服务通过customersink,,去把写到hdfs里面,这里面的基本逻辑是根据时间写不同的目录。
上线。
看一下是什么错误,看一下是否是资源引用的问题,把资源引用上。
上线好后,启动。
进入作业,看下拓扑图。
可以看到数据已经处理了。
下面进入emr集群里看下数据是否真实写入。用emr的公网地址。
已经连接到emr的节点地址上了。
可以看到当前已经有文件写入了,是每小时写一个文件。
接下来看下这个文件里是否有内容。可以看到CustomEvent里面的时间。写进来的日志已经归类到文件了。这个就是我们完成的需要把离线存储的数据按照不同的时间存到不同的hdfs里,后面再做离线分析。
下面做延时的测试,原来的数据先进到adb的控制台,可以把原来的数据清空。
再回到ecs集群,这里先测试1000条数据,执行生成1000条日志。
生成之后可以测试一下延时,可以看到当前延时是5秒,这是平均的延时。
先把数据清掉。
可以看到是1.6秒的延时。
这样我们可以汇总统计,处理多少条数据,在全链路上的延时是多少,因为我们每次启动作业是会有延时在里面。可以看到数据越小反而越快的。可见实时计算在全链路的实时处理上,它的时效性是非常强的。