1.前言
从EMR-3.21.0版本开始,EMR正式发布了Spark Streaming SQL功能,支持使用Spark SQL进行流式数据处理。经过两个版本的迭代,不少用户反馈当使用SQL进行流式作业开发时,查询结果正确性的调试过程比较麻烦。当前,我们需要完成用户真实的数据流开发,才能在结果存储系统中查看结果是否正确。有些数据存储系统又不方便查看,例如Kafka。这里简单罗列几点不便于调试的问题:
- 无法在控制台输出中直观看到SQL的执行结果,传统的需要在输出存储系统查看。
- 数据是变化的:包括输入数据和输出结果都是不断变化的,无法方便看到每个批次的执行结果。
- 每个批次执行的metrics也不方便查看,传统的需要在日志中查找。
除此外,还有一些高级功能也可以考虑到调试工具中,例如:
- 脱离真实数据源的数据模拟功能。
- 数据采样功能。
本文将介绍EMR提供的流式SQL调试功能,它可以很好地解决了SQL调试中的基本需求。高级调试功能也将会在后续的迭代中逐步放出。
注:本功能将在EMR-3.23.0版本提供出来。
2.工具介绍
2.1 演示SQL
我想实现以下功能,将kafka的一个binlog主题同步到Kafka的另一个主题中。
CREATE TABLE rds_binlog
USING kafka
OPTIONS (
kafka.bootstrap.servers='a.b.c.d:9092',
subscribe='rds-binlog')
CREATE TABLE result_table
USING kafka
OPTIONS (
kafka.bootstrap.servers='a.b.c.d:9092',
subscribe='result-table')
CREATE SCAN rds_binlog_stream_read ON rds_binlog USING stream;
CREATE STREAM sync_rds_binlog
OPTIONS(
triggerInterval=1000,
checkpointLocation='/tmp/checkpoint/sync_rds_binlog')
INSERT INTO result_table
SELECT cast(value as string), *
FROM rds_binlog_stream_read;
我现在不确定查询结果是不是符合预期,所以下面我就用到流式SQL的调试功能。上面的SQL需要做一点点修改,这里不能直接将查询结果写到“result_table”,而是改成调试表“stream_debug_table”。Spark会默认创建出这个表。即改成如下:
CREATE STREAM sync_rds_binlog
OPTIONS(
triggerInterval=1000,
checkpointLocation='/tmp/checkpoint/sync_rds_binlog')
INSERT INTO stream_debug_table
SELECT cast(value as string), *
FROM rds_binlog_stream_read;
当我们调试确认没问题后,我们再改回去,正式部署上线运行。执行后,我们就可以在控制台看到如下SQL执行结果:
2.2 操作介绍
流式SQL结果输出界面主要分为三个功能区域:
- 顶部:简单展示流式作业的关键信息,包括:输出模式,批次间隔,批次ID等
- 中间:流式SQL每个批次的执行结果
- 底部:功能键,包括:退出(Quit),自动刷新(Refresh),批次跳转(Goto Batch),结果页跳转(Goto Page),下一个批次(Next Batch),上一个批次(Prev Batch),下一个结果页(Next Page),上一个结果页(Prev Page),当前批次Metrics(Metrics),最后一个批次(Last Batch)。
具体每个功能键的使用说明如下:
- 【Q】Quit: 退出当前界面。
- 【R】Refresh: 触发页面的自动刷新。当你进行【B】,【P】,【+】,【-】,【<】,【>】,【M】,【L】操作会触发停止页面自动刷新,自动刷新会从当前页面批次开始。
- 【B】Goto Batch:手动跳转到某个批次结果。当我们跳转到具体某个批次后,输出界面将会停止刷新,这不会影响流式作业的执行。
- 【P】Goto Page:当某个批次的结果数据较多,一页无法展示时,可以手动跳转到某一页。
- 【+】Next Batch:手动跳转到下一个批次结果。
- 【-】Prev Batch:手动跳转到前一个批次结果。
- 【>】Next Page:手动跳转到下一个结果分页。
- 【<】Prev Page:手动跳转到上一个结果分页。
- 【M】Metrics:查看当前批次的Metrics信息。
- 【L】Last Batch:直接跳转到最后一个批次。
当我们需要跳转到某个批次页面时,会弹出输入框,输入需要跳转到哪一个批次。
这里会提示合法的Batch ID区间,超出区间会提示非法。我们可以使用“ECS”键或者不输入直接回车退出。
对于每一个批次,我们可以查看它执行阶段的Metrics信息,输入“【M】Metrics”键:
当信息比较多时会分页展示,我们可以通过“<”和“>”进行翻页。同样的,我们可以输入“ECS”,“Q”或者“q”退出。
2.3 注意事项
使用过程中有一些注意事项:
- Driver内存保存了每个批次的执行结果,所以会消耗一定的内存资源。长时间运行,或者输出结果较大时可能会出现内存不足问题。
- 建议修改Spark的log4j级别为ERROR,否则INFO/WARN日志会打印到控制台中,影响调试体验。
- 为了支持调试,我们会在Spark中默认创建一个表“stream_debug_table”。如果您已经存在了这个表名,则无法创建出来并且无法进行调试。您可以自己创建一个调试表,例如:
CREATE TABLE my_debug_table USING MEMORY;
3.小结
本文主要介绍了EMR提供的一个流式SQL控制台调试小工具,可以解决基本的SQL正确性调试需求。除此外,一些高级的调试功能也在开发中。下面是录制的一段视频,直观感受下整个工具的使用: