Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。
什么是view(视图):
视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。视图可以包含表中的所有列,或者仅包含选定的列。视图可以创建自一个或者多个表,这取决于创建该视图的 SQL 语句的写法。
视图,一种虚拟的表,允许用户执行以下操作:
- 以用户或者某些类型的用户感觉自然或者直观的方式来组织数据;
- 限制对数据的访问,从而使得用户仅能够看到或者修改(某些情况下)他们需要的数据;
- 从多个表中汇总数据,以产生报表。
(引自:极客学院)
Flink SQL兼容标准SQL,view的作用与标准SQL相同,有几个特点:
- 在Flink SQL中,view是一种临时表
- 与标准SQL一样,视图可以创建自一个或多个表/视图
- 视图的结果不会进行持久化,仅作为计算的中间结果进行传输
- 视图的数据也可以被输出到结果表中
Flink SQL中,视图的语法非常简单,可参考:view语法。接下来我们通过一些例子来实际感受一下视图的作用。
假设在IoT场景中,要过滤出两个厂房中的传感器的异常数据。两个厂房的数据分别发到了datahub的两个不同topic,需要将两个datahub topic中异常数据过滤出来,再汇总。
原始数据结构如下:
- date
- hour
- ip: device ip
- event_id:
DDL -- 定义输入输出数据的数据结构,具体语法请参见 datahub源表/结果表语法,维表相关语法详见Flink SQL维表语法:
-- source1 定义厂房1的topic的数据结构
create table fab1(
`date` int,
hour int,
ip varchar,
event_id BIGINT
) with (
type='datahub',
endPoint='xxxxxxxxx',
project='xxxxxxxxxx',
topic='topic1',
accessId='xXXXXXXXX',
accessKey='XXXXXXXXX');
-- source2 定义厂房2的topic的数据结构
create table fab2(
`date` int,
hour int,
ip varchar,
event_id BIGINT
) with (
type='datahub',
endPoint='xxxxxxxxx',
project='xxxxxxxxxx',
topic='topic2',
accessId='xXXXXXXXX',
accessKey='XXXXXXXXX');
-- 定义结果表1的数据结构
create table sink(
`date` int,
hour int,
event_id bigint,
event_cnt bigint
) with (
type='datahub',
endPoint='xxxxxxxxx',
project='xxxxxxxxxx',
topic='topic2',
accessId='xXXXXXXXX',
accessKey='XXXXXXXXX');
-- 定义结果表2的数据结构
create table sink(
`date` int,
hour int,
event_id bigint,
event_cnt bigint
) with (
type='rds',
url='xxxxxx',
tableName='xxxxxx',
userName='xxxxxx',
password='xxxxxx'
);
-- 维表
CREATE TABLE device_whitelist (
ip varchar,
category varchar,
PRIMARY KEY (ip), -- 用作维表时,必须有声明的主键。
PERIOD FOR SYSTEM_TIME -- 定义维表的变化周期
) with (
type = 'rds',
...
)
写法一,按照批处理系统/数据库的思维来看,这个需求非常简单:
insert into sink
select e.`ip`,e.`hour`,e.`date`,e.`event_id` from
(
select * from fab1
where event_id='00001'
union
select * from fab2
where event_id='00001'
) e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`
写法二,使用view,将各个复杂SQL模块拆开:
--
CREATE VIEW view1(`date`,`hour`,`ip`,`event_id`) AS
SELECT * FROM fab1
WHERE event_id='00001'
UNION
SELECT * FROM fab2
WHERE event_id='00001'
--
CREATE VIEW view2(`date`,`hour`,`ip`,`event_id`) AS
SELECT e.`date`,e.`hour`,e.`ip`,e.`event_id` FROM view1 e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`
-- INSERT INTO sink1
INSERT INTO sink1
SELECT * FROM view2
-- INSERT INTO sink2
INSERT INTO sink2
SELECT * FROM view1
Flink中SQL的数据是不断动态变化的,特别是涉及到一些特殊语法(如window级连/嵌套等),需要分步调试每个SQL模块的结果。如果用写法一,会大大增加SQL调试难度。因此,使用Flink SQL,建议使用第二种写法,用view将各个语法块串联,方便调试和排查问题。写法一和写法二最终生成的作业DAG图都是一样的,没有任何区别。一个Flink SQL作业可以同时定义多个输出表,结果可同时被输出到多种数据源中。
如果在使用实时计算产品过程中有任何问题,欢迎在博客下方回复交流。