两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)

虽然我们利用UnBounded的JOIN能解决上面的问题,但是仔细分析用户需求,会发现这个需求场景订单信息和付款信息并不需要长期存储,
比如2018-12-27 14:22:22的订单只需要保持1小时,因为超过1个小时的订单如果没有被付款就是无效订单了。
同样付款信息也不需要长期保持,2018-12-27 14:22:22的订单付款信息如果是2018-12-27 15:22:22以后到达的那么我们也没有必要保存到State中。 
而对于UnBounded的双流JOIN我们会一直将数据保存到State中,如下示意图:

两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)

这样的底层实现,对于当前需求有不必要的性能损失。所以我们有必要开发一种新的可以清除State的JOIN方式(Interval JOIN)来高性能的完成上面的查询需求。

什么是Interval JOIN

Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域的数据进行JOIN。对应Apache Flink官方文档的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

 

Interval JOIN 语义

Interval JOIN 的语义就是每条数据对应一个 Interval 的数据区间,比如有一个订单表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假设我们要统计在下单一小时内付款的订单信息。SQL查询如下:

SELECT 
  o.orderId,
  o.productName,
  p.payType,
  o.orderTime,
  cast(payTime as timestamp) as payTime
FROM
  Orders AS o JOIN Payment AS p ON 
  o.orderId = p.orderId AND 
  p.payTime BETWEEN orderTime AND 
  orderTime + INTERVAL '1' HOUR

上面这个的意思是:

选择数据的要求是:

支付时间在下单时间一小时以内(同时要求orderId相等)

------------------下面是具体举例----------------------------------------

  • Orders订单数据
orderId productName orderTime
001 iphone 2018-12-26 04:53:22.0
002 mac 2018-12-26 04:53:23.0
003 book 2018-12-26 04:53:24.0
004 cup 2018-12-26 04:53:38.0
  • Payment付款数据
orderId payType payTime
001 alipay 2018-12-26 05:51:41.0
002 card 2018-12-26 05:53:22.0
003 card 2018-12-26 05:53:30.0
004 alipay 2018-12-26 05:53:31.0

符合语义的预期结果是 订单id为003的信息不出现在结果表中,因为下单时间2018-12-26 04:53:24.0, 付款时间是 2018-12-26 05:53:30.0超过了1小时付款。

所以上面两个表格中红色数据表示下单后没有被及时支付,订单被取消。

 

所以有效订单的预期结果如下 :

orderId productName payType orderTime payTime
001 iphone alipay 2018-12-26 04:53:22.0 2018-12-26 05:51:41.0
002 mac card 2018-12-26 04:53:23.0 2018-12-26 05:53:22.0
004 cup alipay 2018-12-26 04:53:38.0 2018-12-26 05:53:31.0

 

完整代码是(flink1.11.2 Java版本):

https://gitee.com/appleyuchi/Flink_Code/blob/master/table_api/Java/src/main/java/SimpleTimeIntervalJoin.java

代码运行结果如下:

001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22
004,cup,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31

 

Reference:

Flink-1.9流计算开发:十六、intervalJoin函数

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

 

 

上一篇:metabase sql统计留存率


下一篇:EdgeX系列之五 -- 设置定时任务