在日志服务,数据加工功能(功能介绍)用于完成对Logstore数据的预处理,为后续的分析阶段准备数据。本文主要介绍数据加工实践中可能遇到的延迟问题,帮助大家理清延迟现象背后的原因,以及如何去监控、解决延迟问题。
什么是加工延迟
Logstore
数据加工作业是衔接源Logstore到目标Logstore的一个管道,在这个管道上完成富化、预处理、清洗等工作。
日志服务的Logstore首先是一个流式数据的存储,在Logstore下有多个Shard(Shard可以类比为Kafka的Partition概念)。每个Shard即是一个数据分片,像是一个无限大队列(基于落盘文件),它是FIFO的:
- 当生产者向Logstore写入数据的时候,数据默认会随机选择一个Shard做存储(追加到Shard的末尾,每个数据包在Shard上有一个位置,定义为Cursor)。
- 在读Logstore的时候,消费者选定一个位置(Cursor)开始,按从头到尾的方向顺序读出数据。
时间定义
在定义加工延迟之前,再来看几个时间概念:
- 数据时间(EventTime):在数据上可以选择一对Key/Value(例如是自定义的timestamp字段)存储业务上的时间;一般建议在日志服务的保留字段
__time__
上设置EventTime,在使用SLS的索引功能做业务时间筛选时会更加方便。(参考日志服务数据模型)
- 数据接收时间(ServerReceiveTime):生产者写出的数据被服务端接收的系统时间,随着数据的顺序写入,ServerReceiveTime总是在递增。
- 数据消费时间(ProcessingTime):消费者读取这个数据做处理的系统时间。
把ProcessingTime - ServerReceiveTime
定义为加工延迟。在流计算的理论情况下,两者差值逼近0。在SLS实际的加工延迟可以做到1秒内,如果超出了一定阈值,即是加工延迟。
怎样发现延迟
确认目标写入情况
当加工发生延迟的时候,直观的情况是目标Logstore数据缺失。
- 如果目标Logstore建立了索引,可以选择一个更大范围的时间窗口,隔几秒种查询一次,看日志条数据是否有变化。如果日志集中在之前的时间段且数据量有逐步增加,那比较大概率是发生了延迟(且加工在继续工作)。
- 另一种情况,目标Logstore上没有索引,可以通过预览的方式,确认是否有新的数据在写入。重复查询几次,例如最近15分钟有数据写入且写入的数据EventTime比较老,也有可能是加工延迟的原因。
这两个方法,除了用于确认加工延迟以外,也推荐用来观察加工延迟的恢复进度、解决加工结果数据找不到的情况。
查看指标
数据加工默认包含一个诊断仪表盘(仪表盘介绍),登录SLS控制台,选择对应的加工作业进行查看。
下图就是一个延迟严重的情况:
而理想情况下加工延迟是:
配置告警
如果需要时刻关注加工延迟,可以配置一个告警,当延迟稳定发生时,SLS把告警推送钉钉群或者短信。
如何对加工配置告警,可以参考加工告警配置指南操作。
在告警通知策略上,设置连续N次触发再做消息推送,可以过滤偶尔抖动的情况。
加工全量数据导致的延迟
这是一种预期内的延迟现象,主要发生在作业刚创建时,有两种场景:
- 保存加工作业时,选择从历史上一个时间点(按ServerReceiveTime是计算)开始处理数据
- 保存加工作业时,选择处理Logstore中的全量数据
加工耗时取决于时间段内的数据规模,加工作业需要从旧数据开始直到追赶进度完成。在加工并发足够的情况下,会发现加工延迟有变小的趋势。
如果观察到加工追赶得比较慢,推荐对任务按照时间段做切分来实现加速。例如,要处理9/1到9/10的历史日志,则按照天将任务切分成9个,分治处理:Job_1: [9/1, 9/2)
Job_2: [9/2, 9/3)
....
Job_9: [9/9, 9/10]
加工逻辑复杂导致的延迟
从Logstore的写能力来看,SLS对一个Shard提供5MB/s(原始数据)以上的写入能力,在日志服务集群资源允许时可能支持10MB/s或者更高的写入吞吐。
当真实的大量数据写入源Logstore的Shard后,加工的业务复杂度是不确定的,因此可能发生瓶颈:加工的速率跟不上数据写入的速率。
一个思路是降低加工的复杂度,从逻辑层面优化DSL代码:
- 使用了e_regex,查看正则表达式的复杂度。对正则做优化往往收到不错的效果,尽量让匹配的边界明确化,可以参考如何优化正则表达式。
- 提前对不需要的数据做过滤:例如对e_drop相关代码做前置,在需要丢弃的数据上可以避免无意义的计算。
- 对于一连串复杂操作,通过使用条件判断语句(例如e_if、e_switch等)设置分支条件。
- 使用了e_split等函数对JSON数组进行分裂写出,1份输入对应10倍或更多的写出。
- 其它情况,持续更新中。。。
如果在加工逻辑层面难以做优化,更直接的方式是按照下一节处理(对源Logstore做Shard分裂),加工会根据Shard数目增加并发度。
源Shard数不足导致的延迟
伴随着加工作业的在线上运行,业务流量变化也可能带来源Logstore的流量增长。对源Logstore做Shard分裂(Shard操作说明),可以让每个Shard上的数据量少一些,并有机会得到更多的加工并发。
分裂Shard的限制
分裂Shard不会对历史上已写入的数据做重分布,也就是说老的数据还在旧Shard上,只有新的数据会落到更多的Shard上。
因此,在分裂Shard后,观察加工延迟指标,可能会发现部分Shard的加工延迟高,部分低的情况:
较为可能的原因是老的Shard历史上积攒数据较多,追上进度需要时间,而新的Shard上的数据可以立刻被处理,从Logstore属性查看Shard创建时间可以确认这一点:
关注Shard的状态
源Logstore上,只有readwrite状态的Shard数目对于加工并发有意义,在执行操作时可以选择将Shard一次性分裂为N个。绝大部分情况下不用关注Shard上的BeginKey/EndKey,因为数据默认是随机Shard写入的。
关于自动分裂
Logstore设置的自动分裂属性,仅对数据写入生效,请仔细阅读该开关的说明。
从加工角度来说,加工延迟不会触发自动分裂。
写出目标受阻导致的延迟
SLS对Logstore写入有速率限制,例如有每秒钟要写入10万条日志(每条512 Bytes),则需要规划5~10个Shard。
当目标Logstore readwrite状态Shard数目不足时,加工有类似WriteQuotaExceed或QuotaExceed字样的报错,可以在诊断仪表盘中看到。
处理方法是:对目标Logstore做Shard分裂。解决写出瓶颈后,加工作业停止阻塞,自动恢复数据处理。
总结
参考加工性能指南,建议在部署加工作业之前,从三个层面进行规划:
- 源Logstore,根据数据量调整Shard数目(readwrite状态),满足一定的加工并发度。
- 加工DSL代码逻辑优化,例如对正则的优化、合理做条件剪枝、数据过滤尽量前置。
- 目标Logstore设置足够的Shard数目(readwrite状态),避免加工写出数据受阻。