Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

0 Joins 

Batch Streaming

Flink SQL supports complex and flexible join operations over dynamic tables. There are several different types of joins to account for the wide variety of semantics queries may require.

By default, the order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. You can tweak the performance of your join queries, by listing the tables with the lowest update frequency first and the tables with the highest update frequency last. Make sure to specify tables in an order that does not yield a cross join (Cartesian product), which are not supported and would cause a query to fail.

1 Regular Joins

  • 左、右侧新记录或回撤记录,都会去对侧,查找匹配的记录,并影响整个连接结果
  • 如果在左侧有一个新记录,那么它将与右侧所有以前和将来的记录连接起来(反之亦然)
  • 不区分condition里面的时间条件,左右两侧流数据可以长时间保存

1.1 普通Join实例

我们以商品价格表和商品名称表为例进行数据处理流程说明

应用场景:实时查看商品价格及商品名称:

SELECT id, price, name FROM Price_Table A
Left Outer JOIN Name_Table B
ON A.id = B.id
  • 左侧商品价格随时间会发生变化
  • 右侧商品名称表,可以输入商品修改后名称

1.2 原理架构

Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

 

 

 

输入数据:

①–⑥号输入数据,代表随时间增加,按顺序输入事件到左、右侧流。
①–⑥号输入数据,可以对应到最右侧Join后输出数据,-号代表回撤,+号代表最新数据
输入数据会更新左、右侧状态数据(内部一般都是MapState实现,如果当前侧为Outer,还会存储引用数,用于计算回撤)


处理数据(见图中#processElement这一部分):

通过joinKey,可以关联成AssocitedRecords(每次处理数据前都会执行)
左右侧数据分别更新leftState、rightState(retract表示删除或计数减1,accumulate表示新增或计数加1)
以左侧一个LeftOuter视图为例:
1、如果输入消息为Accumulate消息(本例子中输入的都是这类):
> 如果对侧无关联数据,则往下游发Null值
> 如果对侧有关联数据,则遍历关联数据,将join结果依次发给下游(消息header为+或者- +两条)
> 如果对侧也是Outer,则需要将对侧对应的数据引用+1
2、如果输入消息为Retract消息(本例子中输入无这类消息):
> 如果对侧无关联数据,则往下游发Null值
> 如果对侧有关联数据,则遍历关联数据,将join结果依次发给下游(消息header均为-)
> 如果对侧也是Outer,则需要将对侧对应的数据引用-1
1.3  Regular Join特性

从普通SQL Join看到,Join条件中不带时间限定字段(a.ctime > c.ctime - INTERVAL 5 MINUTE这种类型):

  意味着左右侧状态需要长时间保存
  如果一个或两个输入表都在不断增长,那么资源的使用也会无限增长


Flink提供了一个不是非常精确的方式,来限制左右侧状态大小:

  即配置minRetentionTime毫秒数(例如一天对应的毫秒数,按数据写入、访问时间,lazy方式清理超时数据)

  minRetentionTime时间属于SQL TableConfig的一个通用配置,最后创建了StateTtlConfig用来清理MapSatate数据,参考Flink State TTL 概述
  minRetentionTime时间也用于创建window EmitStrategy对应的allowLateness,来控制window容忍的最大延迟时间(这里不再展开,可以参考Window相关的文章)


1.4 Join state(存储状态数据结构)
上面图示中,我们多次提到左、右侧状态数据,以及引用计数。
可见状态数据存储十分重要,其实现方式也是双流Join且支持回撤的基础。

 

1.5 场景举例

场景一

加入要计算某天对于第7天的留存率,那么对于传统关系型数据库来说,我们只需要计算出留存用户,然后和当天的用户活跃数去做个比值就OK了。

insert into retention_user
select a.id,b.id
from a left join b on a.id = b.id;
where a.dt = date_add(b.dt,-7)

这样就假设筛选出了一个临时的留存用户表,采用的是left join。左边a.id是当天所有用户,而b.id是七天后a中的用户仍然活跃的用户

select count(b.id)/count(a.id)
from retention_user;
场景二

对于一个订单日志,和付款日志表,需要筛选出在下单后的20分钟内付款的数目

select *
from a join b on a.id=b.id where a.ordertime>=b.paytime-1200000;

但是对于上面两个场景,都是简单的认为这个表是有界的,全局的,静态的。也就是说,每次操作都能够对全局的数据进行join。
但是呢,如果是我们的实际情况,比如饿了吗,点单流不断的涌入,付款流也可能不断地涌入,场景二中,我们的数据就变成*的了(当然从时间上,这种*的数据流也可以成为有界的数据流,即对时间上20分钟做一个划分而已,当然对于传统关系型数据库去做那种批处理的话,只要筛选出对应时间段即可了,就像上面场景二一样。
但是呢,如果这时候新过来一个数据,是不是又得进行一次计算呢,又得加载所有数据,又得计算一次,因为你不知道哪些数据已经计算过了,哪些数据没有计算过。对吧
上面这种对于全局的数据进行join,在Flink Dynamic Table中叫Regular Join,而在流中的话,这些数据都是以Flink Stream状态形式缓存中,这种“缓存”对吧,还是*的,你机器再好缓存这么多还是不理想吧。而且有些数据可能根本就已经过期了
所以,这种方式Regular Join,就是不筛选出任何过期的数据了,但是使用情况不理想,所以一般很少用

Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins
/**
         * 创建环境变量
         * */


        StreamExecutionEnvironment fsenv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings fsSetting = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsenv,fsSetting);
        fsenv.setParallelism(1);
        /**
         * 读取流
         * */
        DataStreamSource<orderEntity> orderDataStreamSource = fsenv.addSource(new orderEntitySource());
        DataStreamSource<payEntity> paymentDataStreamSource = fsenv.addSource(new payEntitySource());

        /**
         * 流转成Table
         * */
        Table orderTable = fsTableEnv.fromDataStream(orderDataStreamSource);
        Table payTable = fsTableEnv.fromDataStream(paymentDataStreamSource);

 /**
         * 5分钟=5*60秒=300 000毫秒
         * Regular Join
         * select * from orderTable join payTable on oid=payid
         * where orderTable.orderTime>=payTable.payTime-300000
         * */
        Table selectResult = orderTable.join(payTable, "oid=payid").where("orderTime>=payTime-6000000");
        fsTableEnv.toRetractStream(selectResult, Row.class).print();
        try {
            fsenv.execute("Regular Test");
        } catch (Exception e) {
            e.printStackTrace();
        }
View Code

Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

2 SQL Join语义介绍

Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

INNER JOIN - 内连接,返回满足条件的记录;
LEFT OUTER - 返回左表所有行,右表不存在补NULL;
RIGHT OUTER - 返回右表所有行,左边不存在补NULL;
FULL OUTER -  返回左表和右表的并集,不存在一边补NULL;
CROSS JOIN - 交叉连接,计算笛卡儿积,一般来说代价较大;
SELF JOIN - 自连接,将表查询时候命名不同的别名,进行逻辑变换后,重新连接;

Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

 

 Flink基础(124):FLINK-SQL语法 (18) DQL(10) OPERATIONS(7)Joins(1)Regular Joins

  • Apache Flink目前主要支持INNER JOIN和OUTER JOIN(LEFT/RIGHT/FULL OUTER)
  • 在语义上面Apache Flink严格遵守标准SQL的语义
  • SELF 可以转换为普通的INNER和OUTER。

参考:

https://blog.csdn.net/LS_ice/article/details/100517019

https://blog.csdn.net/weixin_43272605/article/details/105724106

 

上一篇:Flink基础(119):FLINK-SQL语法 (13) DQL(5) OPERATIONS(2) SELECT & WHERE clause/ SELECT DISTINCT(FLINK 1.13


下一篇:Flink基础(120):FLINK-SQL语法 (14) DQL(6) OPERATIONS(3) 窗口 (1) Windowing table-valued functions (Windowi