Spark从1.6.0版本开始,内存管理模块就发生了改变,旧版本的内存管理模块是实现了StaticMemoryManager 类,现在被称为"legacy"。"Legacy"模式默认被置为不可用,这就意味着当你用Spark1.5.x和Spark1.6.x运行相同的代码会有不同的结果,应当多加注意。考虑的兼容性,可以通过设置spark.memory.useLegacyMode为可用,默认是false.
这篇文章介绍自spark1.6.0版本后的新的内存管理模型,它实现的是UnifiedMemoryManager类。
在这张图中你可以看到三个主要内存区域。
1.Reserved Memory.这部分内存是被系统预留的,它的大小也是被硬编码的。在Spark1.6.0版本,它的大小是300MB,这就意味着这部分内存不能计入Spark内存计算,除非重新编译源码或设置spark.testing.reservedMemory,它的大小是不可改变的,因为park.testing.reservedMemory只是一个测试参数所以在生产中不推荐使用。注意,这部分内存只是被称为“Reserved",实际上它不会被spark用来干任何事情 ,但是它限制了你在spark中可分配的内存大小。即使你想将全部JVM堆内存用于spark缓存数据,也不能使用这部分空闲内存(不是真的就浪费了,其实它存储了Spark的一些内部对象)。供参考,如果你不能为executor至少1.5 * Reserved Memory = 450MB的堆内存,任务将会失败并提示”please use larger heap size“的错误信息。
2.User Memory.这部分内存是分配Spark Memory内存之后的部分,而且这部分用来干什么完全取决于你。你可以用来存储RDD transformations过程使用的数据结构。例如,你可以通过mapPartitions transformation 重写Spark aggregation,mapPartitions transformations 保存hash表保证aggregation运行。这部分数据就保存在User Memory。再次强调,这是User Memory它完全由你决定存什么、如何使用,Spark完全不会管你拿这块区域用来做什么,怎么用,也不会考虑你的代码在这块区域是否会导致内存溢出。
3.Spark Memory.这部分内存就是由Spark管理了。这部分内存大小的计算:(“Java Heap” – “Reserved Memory”) * spark.memory.fraction,而且在spark1.6.0版本默认大小为: (“Java Heap” – 300MB) * 0.75。例如:如果堆内存大小有4G,将有2847MB的Spark Memory,Spark Memory=(4*1024MB-300)*0.75=2847MB。这部分内存会被分成两部分:Storage Memory和Execution Memory,而且这两部分的边界由spark.memory.storageFraction参数设定,默认是0.5即50%。新的内存管理模型中的优点是,这个边界不是固定的,在内存压力下这个边界是可以移动的。如一个区域内存不够用时可以从另一区域借用内存。下边来讨论如何移动及使用的:
1.Storage Memory.这部分内存即可以用来缓存spark数据也可以用来做unroll序列化数据的临时空间。广播变量以block的形式也存储在这里。你奇怪的是unroll,因为你可能会说,并不需要那么多空间去unroll block使其可用——在没有足够内存去unroll bolock的情况下,如果得到持久化级别的允许,将直接在这部分内存unroll block。至于广播变量,当它的持久化级别为MEMORY_AND_DISK时,就会缓存到此。
2.Execution Memory.这部分内存用于存储执行task过程中的一些对象。例如,它可以用来shuflle map端的中间缓存,也可以用来存储hash aggregation过程的hash table.在没有足够内存的时候,这部分内存支持溢室到磁盘,但是这部分内存的blocks不会被其它线程的task挤出去。
下边我们来说一下Storage Memory 和Execution Memory之间的边界移动。从Execution Memory的本质来看,你不能将这部分内存空间的数据挤出去,因为这部分内存的数据是用来计算的中间结果,如果计算过程找不到原来存到这的block数据任务就会失败。但是对于Storage Memory内存就不会这样,它只是用来缓存内存中数据,如果将里边的block数据驱逐出去,就会更新block 元数据映射信息使用到时告知该block被移除了,要想再拿到这些数据从HDD中读取即可(或者如果缓存级别没有溢写就重新计算)。
所以,我们只能Execution Memory可以向Storage Memory挤用空间,反之不可。那么当什么时候会发生Execution Memory 向Storage Memory挤用空间呢?有两种可能:
- 只要Storage Memory有可用空间,就可以增大Execution Memory 大小,减少Storage Memory 大小。
- Storage Memory的空间大小已经超出了初始设定的大小,并且将这部分空间全部占用,在这种情况下就可以强制将从Storage Memory中移出Blocks,减少它的空间到初始大小。
反过来,在只有当Execution Memory空间有空余时,Storage Memory才可以向Execution Memory借用空间,也就是说Execution Memory只要不够用了就可以向Storage Memory挤占空间不管Storage Memory有没有空余,而Storage Memory只能当Execution Memory有空余时才要以借用不能抢占。
初始Storage Memory 大小:“Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction。根据默认值,即(“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. 如果Java Heap=4G,那么就有1423.5MB大小的Storage Memory空间。
这就意味着当我们使用Spark cacheu并加载全部数据到executor中时,至少要将Storage Memory大小等于默认初始值大小。因为当Storage Memory区域还没满时,Execution Memory区域已经膨胀大于其初始设定大小时,我们不能强制将Execution Memory抢占的空间数据驱逐,所以最终Storage Memory会变小。
希望这篇文章可以帮你更好的理解spark新的内存管理机制,并以此来应用。