作者:伍翀(云邪)
摘要: 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀(云邪)分享,主要介绍了Flink SQL和Table API的诞生背景、概念和功能,并通过三个实例演练让观众更直观地了解了Flink及其在Kibana上的具体操作流程。内容如下:
- Flink SQL和Table API诞生的背景;
- Flink SQL和Table API的核心概念及功能;
- 结合Demo进行实战演练
Flink有非常强大的API抽象能力,它提供了三层的API,从底至上分别是Process Function, DataStream API以及SQL和Table API。这三层都有不同的用户群体,越低层灵活度越高,门槛也会越高,最高层门槛较低,但是会牺牲一些灵活度。
为什么要花精力做SQL和Table API?
DataStream API非常好用,因为它的表达能力非常强,用户可以维护和更新应用状态,而且它对时间的控制力也非常灵活。但相对而言,它的复杂度和门槛也更高,并不适用于所有人,很多用户希望专注于业务逻辑。所以,要提供更加简单易懂的API,SQL是目前最佳的选择。
Flink SQL和Table API优势有很多。首先它非常易于理解,很多不同行业不同领域的人都懂SQL,它已经成为大数据处理生态圈的标准语言了;其次SQL是声明式的语言,用户只需要表达想要什么,而无需关心如何计算;然后SQL是会自动优化的,能生成最优的执行计划;同时SQL还是30多年的语言,非常稳定;最后,SQL可以更容易地统一流和批,用同一套系统就能同时处理,让用户只关注最核心的业务逻辑。
SQL和Table API 简介
Flink关系式的API主要暴露两种,一种是SQL的API,还有一种是Table的API。SQL API完全遵循ANSI SQL的标准设计,所以如果有SQL基础,它的学习门槛是比较低的,而Table可以理解为类SQL的编程式的API。他们都是统一的批处理和流处理的API,不管输入是静态的批处理数据,还是无限的流处理数据,他的查询的结果都是相同的。总结而言,就是一份代码,一个结果,这也是流批统一的最重要的评价指标。
Flink的工作流程
下面是比较高级的概览图,SQL和Table在进入Flink以后会转化成统一的数据结构表达形式,即Logical Plan。其中,Catalog会提供一些原数据信息,用于后续的优化。Logical Plan是优化的路口,经过一系列的优化规则后,Flink会把初始的Logical Plan优化为Physical Plan,并通过Code Generation机制翻译为Transformation,最后转换成JobGraph,用于提交到 Flink的集群做分布式的执行。可以看到,整个流程并没有单独的流处理和批处理的路径,因为这些优化的过程和扩建都是共享的。
用实例理解流和批
比如一个点击的文件,有user、点击的时间和URL。如果我们要统计点击的次数,在选出user做统一的批处理的情况下,它的特点就是一次性读入和一次性输出。
而如果Click是一个数据流,在这种情况下,输入一条数据后就能输出一个结果,比如Marry第一次点击会记录一次,第二次点击就会做增量计算。所以输入数据会持续读入,结果也会持续被更新。
可以看到,这里流和批的结果是一样的,所以我们可以把以前批处理的SQL迁移到Flink上做流处理,它的结果和语义应该和之前的批处理是一样的。
Flink SQL和Table应用案例
典型的包括低延迟ETL处理,比如数据的预处理、清洗和过滤;还有数据管道,Flink可以做实时和离线的数据管道,可以构建低延时实时数仓,也可以实时数据同步,把数据从某一个数据系统同步到另一个数据系统;
第三种是流式和批式的数据分析,去计算和更新离线或实时的数据,并进行可视化,典型的比如阿里双11的大屏;
最后一种是模式识别,也就是实时地识别数据流中符合某种pattern的事件流,然后做相应的监控或者报警的服务,比如网约车的一些异常事件的监控服务。
Flink的核心功能
下图包含了Flink的一些核心功能。第一个是SQL的DDL,DDL直接对接外部系统,它的强弱决定了Flink与外部系统的联通性,而作为一个计算引擎,与外部数据存储的联通性非常重要;第二是完整的类型系统,它支持多种数据类型,这对SQL引擎而言也是非常必要的;第三是高效流式TopN,有非常强大的流处理能力,用来实时计算排行旁,比如双11的销量排行榜;还有高效的流式去重对数据进行过滤,因为有时采集会包含重复的数据;还有维表关联、对接CDC等。
除此之外,Flink还有非常多的内置函数,支持MiniBatch,以及有多种解热点手段。它还支持完整的批处理,适用Python等语言,还有Hive的集成等功能,不仅能直接访问Hive的数据,还兼容了Hive的语法,让用户不必再频繁切换。
示例
下面是一个电商的用户行为的实时分析。我们从Kafka中实时地消费用户的行为数据,并与MySQL中的数据进行关联,然后写入Elasticsearch的索引中,并用Kibana进行视觉化呈现。这是一个端到端的实时应用的构建。
下面是在Kibana上的最终展示成果,会有面板进行实时监控,显示出包括当前的独立用户数、类目排行、各时段购买量等数据。
下面是来自某宝的用户行为日志,我们只选取了11月27日当天的行为,它包含这几个字段,包括用户ID、商品ID、商品类目ID、行为类型和时间戳。行为类型中,pv代表点击,buy代表购买,cart代表加入购物车,fav代表收藏事件,而时间戳代表事件发生的时间。
实战演练
演练的示例代码已经传到了Github,大家如果有兴趣也可以按照这个文档一步一步做下去。我们准备一台装有Docker的Linux或者MacOS计算机即可,不需要下载额外的包。
首先,我们新建一个目录,比如叫flink-sql-demo,然后把docker-compose的demo文件下载下来,可以点进去看一下这个文件。
这里面有个dategen的数据源,我们可以去控制它的产生速度,比如把产生的速度从2000改成3000。
我们通过docker-compose up-d把docker中的容器都启动起来。容器包括Jobmanager、Taskmanager这两个Flink的集群,还有Kibana、Elasticsearch、Zookeeper、MySQL、Kafka等。
我们可以用Docker-compose的命令看一下Kafka中最新的10条数据。它有user ID,有商品ID,有类目ID,有用户的行为,还有一个TS代表这个行为当时发生的时间。
随后我们启动今天的主角,通过Docker-compose启动SQL-Client容器,当看到这个松鼠的时候,SQL Client就成功启动了,我们可以在里面运行SQL的命令。
第一步我们要用DDL创建数据源,把用户日志的数据源先创建起来。我们用Create Table这个DDL语法创建了一个user behavior的表,它里面有5个字段,包括user ID,商品ID,类目ID,用户行为和TS时间戳。With里面跟的是一些如何连接到外部系统的属性,比如用Kafka连接外部的topic。
另外我们也能通过show table看user behavior,用describe table看表的结构、字段、计算列、watermark策略等等。
我们也可以进到8081端口,这是Docker compose旗下Flink集群的一个Web UI界面,这里面各个栏目大家都可以去看看。
接下来我们用3个实战去画一些图表,深入了解Flink的一些功能。
首先是统计每小时的成交量。我们先用DDL创建Elasticsearch表,定义每小时的成交量,随后提交Query去做每小时成交量的统计分析。
我们需要做每小时的一个滑窗,用到Tumble Window语法。Tumble的第一个字段定义时间属性,也就是刚刚说的TS事件时间,然后开窗大小是一小时,也就是说我们每小时会滑一个窗口,然后对窗口内的数据做统计分析。
我们提交这个Query,然后通过5601端口访问Kibana对它进行可视化。刚进来的时候是空的,里面什么数据都没有,所以我们一般要先创建 create index pattern,通过页面的Management中的Index Pattern进入,找到我们的索引,点击进去创建。
创建了Index Pattern以后,我们才能在当中做一些Discovery或者可视化。可以看到,这些字段就是我们刚才DDL里定义的,并且它还有对应的值。
当然我们最终是要进行可视化,所以我们要创建一个Dashboard。在页面左上角点击Dashboard,然后点击Create New创建新的视图,随后就可以设置每小时的成交量了。
我们画一个面积图,在Y轴上选择购买量max,标签名字改为“成交量”,然后因为X轴展示时间,所以选择“hour-of-day”,order by字母序,改为24,随后点击播放键,面积图就画好了。我们也可以点击保存,这样这个面积图就会保存到Dashboard上。
随后我们再画一个图,统计一天每10分钟累计独立用户数。同样,我们需要现在SQL CLI中创建一个Elasticsearch表,用于存储结果汇总数据,字符段包括日期时间和累计uv数。
然后,我们在SQL CLI中执行Table。
这里面Query主要做了一件事,就是把日期和时间选出来。这里唯一有一点特殊的是,因为我们的需求是做每10分钟的点,所以用Substr的两个竖线做连接的函数来实现。随后,我们像之前一样把Query提交到SQL CLI里面运行。
和之前一样,我们创建新的视图,这里我们创建的是一个连线图。
我们在Y轴取uv的值,命名为“独立访问用户数”,X轴选terms,然后选time-str,order by Alphabetical,一天中改为150个点。然后点播放,独立用户数的曲线图就出现了。同样,我们可以点击保存,把这个图加到Dashboard上。
随后我们来画第三张图。第三张图是*类目排行榜,因为一个商品对应的类目太细分,比如对应非常细的第三四级类目,所以它对排行榜意义可能不大。但是我们希望归约到一个*类目做统计分析,所以开始之前准备了MySQL的容器,里面准备了子类目和*类目的映射关系。
我们首先在SQL CLI中创建MySQL表,后续用作维表查询,同时再创建一个Elasticsearch表,用于存储类目统计结果。而在Query这里,我们会用到Create View语法去注册一个临时的视图,简化写法,因为把两个Query写在一起可能会比较复杂。
我们同样在SQL CLI中运行代码,然后进入到Kibana页面建立索引和添加可视化图表。这里我们使用Horizontal Bar去画一个柱形图。
在Y轴上,我们统计类目的成交量,X轴用类目的名字,排序用倒序排列,随后点击播放键。
最后,我们同样点击保存,把类目排行榜添加到Dashboard上。加上之前我们做的两个,Dashboard上就有3个图表了,这里可以自己拖拽图表美化一下。
以上就是今天的课程,大家也可以到Github上的文档中再去学习和实践。
活动推荐:
仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506