一、简介
通常来讲,从输入源和处理模式的角度来看,我们的streaming有多种模式。包括Message Source based stream,File Source based stream,等等。
我们可以看到,针对一个streaming系统,无论在哪种模式下,structured streaming的运行环境中都会有两个关键因素影响着整体性能。一个是input size,一个是state size。这两个size完全指导于我们整个streaming的全部的流程,与端到端性能强相关。
对于这样的场景,我们用下面这个例子来一步步展开,讲解和分析整个调优过程。
二、输入参数
对于一个streaming系统来讲,数据来源是input上游。对于input的size的限制,直接决定了数据流的时间复杂度。我们整个streaming的处理流程,或者说处理的每一个batch当中的时间复杂度是O(n×m),如下图所示。
输入参数的重要性主要体现在如下图三点:
我们常指的输入参数,主要用于控制单个batch size的控制,以及整体与我们的shuffle,partition,还有cluster的资源的联动,对于我们的整个性能调优是至关重要的。
另外,我们刚才也提到了关于join场景。比方说我们接下来给到的例子,streaming以及静态数据的这样一个join的场景当中。如果说我们对于每个分片的shuffle size的大小,以及如何把它能够转化为一个内存shuffle的场景,也是对于我们输入参数进行一个合适调优的过程。
如果我们用系统默认的maxFilesPerTrigger来进行这样一个计算,会触发哪些问题。以Delta举例,默认的选项就是一千个文件。假设我们的生产环境,每个文件有200MB。在我们的demo中会看到mini-batch越变越慢,也就是说,我们的消费者永远大于生产者,随着时间的推移,整条流的latency越来越大。我相信大家在生产环境中也会经常遇到类似场景。
接下来的demo中,我们演示maxFilesPerTrigger的调参。这里我们有两个目标值。第一个目标值是我们希望每一个shuffle partition size在100~200MB。这个值是怎么来的,某种程度上,某一个集群配置,或者说某一种集群配置下,这个值都是不一致的。需要大家按照自己的集群大小,包括memory,配置进行一个预估和调整。第二个,我们需要shuffle partition和core的值是相等的。
可以看到在当前demo中,我们通过maxFilesPerTrigger调整到6个files,这个时候就没有shuffle再发生了。同时,我们的Processed Records/Seconds比原来的这样一个比例调整上升了30%。
屏蔽了shuffle spill这样一个耗时操作之后,我们的调优工作就结束了吗,其实并不是。我们还可以做进一步的调优。正如刚才背景介绍所说,我们的一条动态流和一条静态的DataFrame进行join,但是我们的静态DataFrame其实完全意义上来讲是可以做broadcast hash join。
三、状态参数
这一部分是关于状态参数。在我们刚才一开始提到的影响streaming的两个维度,一个是input size,另一个是state size。我们需要进一步的对state size有一个很好的限制。否则,我们每一个input,需要在state store当中去做查找,或者说做匹配,做一系列的两个batch之间的状态互通的操作的时候,就会导致一系列的性能问题。
这里的state主要包括两个部分:第一部分是State Store backed operations,第二部分是Delta Lake table or external system。
状态参数的重要性如下图所示。因为每一个batch都需要对state当中的操作进行查询以及按需更新的步骤。无限增长的state肯定会让你的作业越跑越慢,并且到最后消费者跟不上生产者。与此同时,它也会带来我们刚才遇到的问题,比方说shuffle spill或常见的out of memory的问题。
下一个demo中,我们主要会给大家演示的是这两类的state store。第一种与operation强相关。比方说,按需选择watermarking是我们需要保留的history的一个水位线。也就是说,我们当前的系统考虑多久之前的数据就算过期了,我们就可以不用再考虑。另外一个就是哪一种state store我们正在被使用。第二种完全跟具体操作无关,需要去看我们的query的predicate,具体的query实现是什么样的。然后看我们每一次参与计算的batch对应的量,以及state的一个修改情况。
接下来看一下状态参数的例子。第一部分和第二部分跟上文中的例子保持一致,没有什么变化。这个例子着重于第3部分,Aggregate sales per item per hour。
四、输出参数
接下来我们看一下输出参数,相对于前几种场景比较特殊。在上文中提到的input state对structured streaming的影响的二维象限里面,其实并没有output。Structured streaming框架本身并不会受到输出参数的影响。但是它更多的影响其实是在下游。对于streaming系统来讲,我们期望整个端到端的延迟也好,吞吐也好,达到一个整体最优化。
我们可以看到,如果输出参数不是一个很合适的值,最常见的问题就是小文件问题。我们会spill,或者说我们会写出大量的这种小的文件,导致下游的读取文件的操作会变得很慢。与此同时,以这样的方式去写出的时候,其实对框架本身也是一种开销。
其实商业系统当中对于输出参数是有一系列的优化的。比方说,Delta Lake系统当中对于output有一个Auto-Optimize的功能,它是默认打开的一个feature。Delta Lake在每一个batch写出的时候,他都会根据自己的合适的partition,以及file size进行一个自动的调优的输出。
五、部署
最后,对于我们在线下调整好streaming的参数,如何保证一系列的性能指标在上线之后也有同样好的性能。首先需要考虑的就是关于driver节点的部署。常见的情况来讲,一开始觉得非重要的streaming流,我们会将它线下的batch job与其他的streaming job进行一个混合部署,用同一个spark cluster。但是这种情况下,我们就会发现不同的作业会互相影响,尤其是driver节点会有一个对应的性能影响。所以在一开始部署,或者说在集群治理的角度,我们需要关注对应的streaming流是否可以和其他的作业进行混合部署。
与此同时,有可能的弹性伸缩的需求。我们之前期望core和shuffle partition尽量一致,来尽可能的用到系统cluster当中的所有资源。但是如果说你后期会有弹性的部署需求,比方说,我希望有临时的一个集群扩展,对于streaming的cluster。这个时候,其实是有一个限制在里面。就是说如果你的shuffle partition一开始设置的比原有的core要小,这种情况下,集群的扩展对你原有的系统是没有用的。因为你的shuffle partition对应的参数写在checkpoint file当中去了,所以不会增加额外的资源。
下一点有关于capacity planning。因为shuffle partition是一个fixed的checkpoint里边对应的一个值。我们如果说打破了对应的这样一个限制,真正重启作业的时候需要将checkpoint清掉。最后,我们需要在一开始的时候就考虑state恢复的机制。
我们一共讲了四个方向,对于streaming性能有一定影响,如下图所示,分别是输入参数,状态参数,输出参数,以及部署。
关键词:Spark Structured Streaming,input parameter, state parameter,output parameter,参数调优
获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<