在1985年的电影《回到未来》中,由发明家Doc Brown设计的神器磁通电容器(flux capacitor)让Marty Mcfly拥有了穿越时空的能力。而数据库一般是具有ACID四大特性,需要考虑时间对数据库的影响。一直以来,弄清楚如何管理和建模时间数据以进行有效的时间点分析是一项长期的研究过程,最早可以追溯到80年代早期,2011年才引入了SQL标准中的时态表(temporal tables)。到目前为止,用户注定要将其作为应用程序逻辑的一部分来实现,这通常会延长开发生命周期以及损失代码的可维护性。此外,虽然没有统一的、普遍接受的时态数据( temporal data)定义,但它所代表的挑战是相同的:如何根据动态变化的历史数据集验证或丰富数据?
例如:如果出租车票价事件与乘坐地点的当地货币挂钩,我们尽可能的希望将票价转换为通用货币以便进一步处理。这是由于汇率在一段时间内的波动会很大,为了产生可靠的结果,每个出租车费用事件都需要与事件发生时的有效汇率相匹配。
使用Flink建模时态数据
在1.7版本中,Flink已将时态表的概念引入其流式SQL和表API中:仅附加表的参数化视图、或者仅允许插入记录、从不更新或删除记录的任何表,这些表被解释为变更日志,并将数据与时间上下文紧密相关,以便只能在特定时间段内将其解释为有效。将流转换为时态表需要以下两步:
- 定义主键和版本控制字段,可用于跟踪随时间发生的更改;
- 将流公开为时间表函数,将每个时间点映射到静态关系。
回到上面的示例用例中,时态表正是我们对汇率数据进行建模所需要的,例如对时间点查询有用。临时表函数是作为Flink通用表函数类的扩展实现的,并且可以用与表API或SQL解析器一起使用来定义。
import org.apache.flink.table.functions.TemporalTableFunction;
(...)
// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
// Provide a sample static data set of the rates history table.
List <Tuple2<String, Long>>ratesHistoryData =new ArrayList<>();
ratesHistoryData.add(Tuple2.of("USD", 102L));
ratesHistoryData.add(Tuple2.of("EUR", 114L));
ratesHistoryData.add(Tuple2.of("YEN", 1L));
ratesHistoryData.add(Tuple2.of("EUR", 116L));
ratesHistoryData.add(Tuple2.of("USD", 105L));
// Create and register an example table using the sample data set.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
tEnv.registerTable("RatesHistory", ratesHistory);
// Create and register the temporal table function "rates".
// Define "r_proctime" as the versioning field and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
tEnv.registerFunction("Rates", rates);
(...)
实际上,这个Rate
函数的作用是什么呢?假设我们想检查给定时间的汇率,比如11:00这一时刻。我们可以简单地通过下面的sql语句得到答案:
SELECT * FROM Rates('11:00');
尽管Flink还不支持使用常量时间属性参数查询时态表函数,但这些函数可用于覆盖更有趣的场景:时态表连接。
使用临时表流式连接
时态表在与流数据结合使用时发挥了其全部潜力。例如,为应用程序提供动力,这些应用程序必须针对一个参考数据集连续地列出白名单,该数据集随着审计或法规遵从时间而发生变化。由于计算成本和资源的消耗,高效连接长期以来一直是查询处理器面临的持久挑战,但在流式数据上的连接带来了一些额外的挑战:
- 流的无边界特性意味着输入被连续评估,并且中间连接结果可以无限地消耗内存资源。Flink优雅地管理其内存消耗(即使对于连接需要溢出到磁盘的较重情况),并支持时间窗口连接以限制需要保持为状态的数据量;
- 流数据可能是乱序的或者有延迟,因此不可能预先强制执行排序,并且时间处理需要一些思考以避免不必要的输出和撤销。
在时间数据的特定情况下,时间窗连接是不够的(至少在不进行一些代价调整的情况下是不够的)。这会导致迟早会发生每个参考记录将落在窗口之外并从状态擦除的情况,不再正在考虑将来的联接结果。为了解决这个限制,Flink引入了对时态表连接的支持,以涵盖时变关系。
探测端(Taxi Fare
)上仅附加表中的每个记录与构建端(Conversion Rate
)上的时间表中的记录版本相连接,该版本与探测端记录时间属性(time
)找最接近匹配的主键(currency
)值。还记得之前注册的时态表函数(Rates
)吗?现在可用于将此连接表达为一个简单的SQL语句,否则需要使用子查询复杂语句。
时态表连接既支持处理语义, 也支持事件时间语义,并有效地限制保持在状态中的数据量,同时还允许构建端上任意旧的记录,这与时间窗口连接相反。探测端记录只需要在很短的时间内保持状态,以确保存在无序记录时正确语义。本节开头提到的问题可以通过以下方式克服:
- 缩小连接的范围:对于给定的
taxiFare.time
只有时间匹配版本的ratesHistory
可见; - 从状态中修剪不需要的记录:在当前时间和水印(watermark))延迟之间的记录对于探测端和构建端都是持久的。一旦水印到达并且结果被发出,这些将被丢弃,允许连接操作在时间上向前移动,构建表在状态下“刷新”其版本。
结论
根据上面的内容可以总结到,可以使用Flink在关系和时变参数中能够表达丰富的连续流,而不必涉及语法拼写或者对性能有所影响。换句话说:流式时间旅行无需磁通电容器。将此语法扩展到批处理,以使用适当的(事件)时间语义来丰富历史数据,这也是Flink路线图的一部分!
如果想在使用Flink SQL(通常是Flink SQL)连接流方面获得一些实际操作实践,这里有一个免费培训项目,培训环境基于Docker,只需几分钟即可建立。
作者信息
本文由阿里云开发者社区组织翻译。
文章原标题《Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL》
作者:morsapaes
译者:海棠
文章为简译,更为详细的内容,请查看原文