spark(三):blockManager、broadcast、cache、checkpoint

blockManager

spark(三):blockManager、broadcast、cache、checkpoint

  1. Driver和executor上分别都会启动blockManager,其中driver上拥有所有executor上的blockManager的引用;所有executor上的blockManager都持有driver上的blockManager的引用;
  2. blockManagerSlave会不断向blockManagerMaster发送心跳,更新block信息等;
  3. BlockManager对象被创建的时候会创建出MemoryStore和DiskStore对象用以存取block,如果内存中拥有足够的内存, 就 使用 MemoryStore存储, 如果 不够, 就 spill 到 磁盘中, 通过 DiskStore进行存储。
  4. DiskStore 有一个DiskBlockManager,DiskBlockManager 主要用来创建并持有逻辑 blocks 与磁盘上的 blocks之间的映射,一个逻辑 block 通过 BlockId 映射到一个磁盘上的文件。 在 DiskStore 中会调用 diskManager.getFile 方法, 如果子文件夹不存在,会进行创建, 文件夹的命名方式为(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一个随机数), 所有的block都会存储在所创建的folder里面。
  5. MemoryStore 相对于DiskStore需要根据block id hash计算出文件路径并将block存放到对应的文件里面,MemoryStore管理block就显得非常简单:MemoryStore内部维护了一个hash map来管理所有的block,以block id为key将block存放到hash map中。而从MemoryStore中取得block则非常简单,只需从hash map中取出block id对应的value即可。
  6. GET操作 如果 local 中存在就直接返回, 从本地获取一个Block, 会先判断如果是 useMemory, 直接从内存中取出, 如果是 useDisk, 会从磁盘中取出返回, 然后根据useMemory判断是否在内存中缓存一下,方便下次获取, 如果local 不存在, 从其他节点上获取, 当然元信息是存在 drive上的,要根据我们上文中提到的 GETlocation 协议获取 Block 所在节点位置, 然后到其他节点上获取。
  7. PUT操作 操作之前会加锁来避免多线程的问题, 存储的时候会根据 存储级别, 调用对应的是 memoryStore 还是 diskStore, 然后在具体存储器上面调用 存储接口。 如果有 replication 需求, 会把数据备份到其他的机器上面。

cache、persist、checkpoint

  1. 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。
  2. cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
  3. persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。默认缓存级别是StorageLevel.MEMORY_ONLY,也就是cache就是这个默认级别的。
  4. checkpoint是将数据持久化到HDFS或者硬盘。
  5. rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉( 话说怎么 remove checkpoint 过的 RDD? ),是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

broadcast、accumulator

  1. 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。(注意是一个较大的只读变量,不能修改)
  2. Accumulator是spark提供的累加器,顾名思义,该变量只能够增加。
  3. 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=)
  4. 使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。如果需要使用多次则使用cache或persist操作切断依赖。
上一篇:功能优异的PDF处理工具


下一篇:tensorflow 模型加载(没有checkpoint文件或者说只加载其中一个模型)