Hama学习笔记
1. Hama定义
Hama是基于HDFS上的BSP模型实现,其执行不须要MapReduce。
例证例如以下: 在单点调试的Hama系统上,仅仅执行NameNode、DataNode、BSPMasterRunner、GroomServerRunner和 ZooKeeperRunner进程。就可以执行PageRank程序。
2. MapReduce与BSP差别
运行机制:MapReduce是一个数据流模型,每一个任务仅仅是对输入数据进行处理,产生的输出数据作为还有一个任务的输入数据。并行任务之间独立地进行,串行任务之间以磁盘和数据复制作为交换介质和接口。
BSP是一个状态模型,各个子任务在本地的子图数据上进行计算、通信、改动图的状态等操作。并行任务之间通过消息通信交流中间计算结果,不须要像MapReduce那样对全体数据进行复制。
迭代处理:MapReduce模型理论上须要连续启动若干作业才干够完毕图的迭代处理,相邻作业之间通过分布式文件系统交换所有数据。BSP模型仅启动一个作业。利用多个超步就能够完毕迭代处理。两次迭代之间通过消息传递中间计算结果。
因为降低了作业启动、调度开销和磁盘存取开销,BSP模型的迭代运行效率较高。
数据切割:基于BSP的图处理模型,须要对载入后的图数据进行一次再分布的过程,以确定消息通信时的路由地址。比如,各任务并行载入数据过程中。依据一定的映射策略。将读入的数据又一次分发到相应的计算任务上(一般是放在内存中),既有磁盘I/O又有网络通信,开销非常大。可是一个BSP作业仅需一次数据切割,在之后的迭代计算过程中除了消息通信之外。不再须要进行数据的迁移。而基于MapReduce的图处理模型。普通情况下,不须要专门的数据切割处理。可是Map阶段和Reduce阶段次年在中间结果的Shuffle过程。添加了磁盘I/O和网络通信开销。
总结:MapReduce发送数据+消息。而Hama仅仅发送消息。在Hama的超步迭代过程中,当某个BSPPeer收到其它BSPPeer发送过来的某顶点的消息。进行消息处理,而后要把处理结果发送到该节点的邻接节点,因此该节点的数据信息也必须存在该BSPPeer中,故必须在对数据载入到内存时进行一次Hash再分布。
以下分析Hama中数据再分布的机制,源代码位于GraphJobRunner.loadVertices()方法中。首先获取每一个BSPPeer的数据分片大小splitSize。举比例如以下表1所看到的:
表1 BSPPeer数据量信息
Peer序号 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
数据量 |
62M |
64M |
54M |
在GraphJobRunner.partitionMultiSteps(BSPPeer,splitSize)方法中,每一个BSPPeer把自己的splitSize发送给MasterPeer。
进行同步后,在MasterPeer上找到最大全部BSPPeer上最大的splitSize赋值给maxSplitSize,即maxSplitSize等于BSPPeer2上的64M。
然后依照例如以下公式计算计算数据载入后Hash再分布的同步次数steps:
maxSplitSize/conf.getLong("hama.graph.multi.step.partitioning.interval",20000000) +1
由此公式可知。用户可配置hama.graph.multi.step.partitioning.interval的大小。但在hama-default.xml未找到此项。
hama.graph.multi.step.partitioning.interval含义:表示Hash再分布时进行同步的最大块单元,默认是20M。
steps = 64M / 20M + 1 = 4 (进行4次同步)
然后MasterPeer把该steps值发送给全部的BSPPeer。并在每一个BSPPeer中赋值给GraphJobRunner. partitioningSteps变量(值为4)。
在每一个BSPPeer计算各自的Hash再分布时的块同步单元:interval = splitSize / partitioningSteps。计算结果例如以下表 2所看到的:
表2 每一个BSPPeer进行Hash再分布的块信息
Peer序号 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
数据量 |
62M |
64M |
54M |
partitioningSteps值 |
4 |
4 |
4 |
Interval值 |
15M |
16M |
13M |
每次同步块大小(M) |
15、15、15、 |
16、16、16、 |
13、13、13、 |
每一个BSPPeer依次从HDFS上读取数据,并依据Hash进行发送(每读入一个顶点就发送一次),当发送量达到自己的块同步单元后(BSPPeer1:15M,BSPPeer2:16M,BSPPeer3:13M)。进行一次同步。各BSPPeer把接受到的数据载入的内存中,即存储于GraphJobRunner.Vertices变量中。按此进行3(partitioningSteps-1)次。
最后一次中,BSPPeer1发送17M数据,BSPPeer2发送16M数据。BSPPeer3发送15M数据,再进行同步,而后载入到GraphJobRunner.Vertices中。
数据Hash重分布之后,每一个BSPPeer上的顶点vertices大小分布可能例如以下表3所看到的,当中如果每一个顶点的大小40byte(实际每一个顶点大小会不同,如PageRank。
此处仅仅是为了举例说明算法)。再补充GraphJobRunner中vertices的定义:
List<Vertex<V, E, M>> vertices =new ArrayList<Vertex<V, E, M>>()
表3 BSPPeer进行Hash重分布后Vertices.size信息
Peer序号 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
数据量 |
80.2 M |
40.16 M |
59.64 M |
Vertices.size |
2005 K |
1004 K |
1491 K |
以下阐述Hama的数据修复(Repair)机制。源代码位于GraphJobRunner. repair()方法中,此方法在loadVertices()方法的最后调用。
先用单个BSPPeer上的样例介绍数据修复的概念。
如对于PageRank,眼下实际有四个顶点。例如以下图1所看到的。而用户输入的数据例如以下:
1 2 3
2 3
3 1 4
图1 PageRank图
但用户没有写4顶点的信息。应该写为: 4 邻接顶点,当其邻接边为空的时候,也应该写为:4 空(实际不写“空”,为了文档描写叙述方便)。数据修复的目的就是添加:“4 边空”这条信息。事实上是把4顶点作为悬挂顶点来处理。
在超步(S-1)中3顶点会把其PR值的1/2发送给顶点4应该所在的BSPPeer(实际没有4顶点的信息)。在超步S中,若数据载入时没有进行过数据修复,则BSPPeer没有4顶点的信息,不如直接把其临边作为空处理即可。这和数据修复效果一样。这样做不是更加简单吗?为什么要花那么大的代价进行数据修复呢?
解释:上述在计算过程中直接把其邻接边作为空的方案是不对的。由于在计算顶点总数(等于每一个BSPPeer上的Vertices.size之和)时就会出错,导致给每一个顶点的初始值就会出错。然后再导致aggregator出错。
每一个BSPPeer获取其上Vertices的大小,都发送给MasterPeer。
在MasterPeer上找到最小的minVerticesSize,再计算数据修复时的同步次数multiSteps。公式例如以下:
multiSteps = min { minVerticesSize , ( partitioningSteps * 2 ) }
分析:一般minVerticesSize都大于( partitioningSteps* 2 )。如对上例minVerticesSize的大小为1000k,而( partitioningSteps *2 ) = 4*2 = 8,故multiSteps的值为8。
然后MasterPeer把此值发送给全部的BSPPeer,每一个BSPPeer存储于自己的变量multiSteps中。在每一个BSPPeer计算各自数据修复时的块同步单元:vertices.size/
multiSteps。注意:此时进行同步的单元不是数据量大小,而是顶点的数目。计算结果例如以下所看到的。
表 4 每一个BSPPeer进行数据修复时的同步信息
Peer序号 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
数据量 |
80.2 M |
40.16 M |
59.64 M |
Vertices.size |
2005 K |
1004 K |
1491 K |
顶点同步单元 |
250 K |
125 K |
186 K |
multiSteps次同步后剩余 |
5 k |
4 k |
3 k |
每一个BSPPeer依次从内存(vertices变量)上读取每一个顶点,获取其邻接顶点后,再依据其Hash值把邻接顶点的id发送到对应的BSPPeer上。当发送顶点的数目达到各自的同步单元后(BSPPeer1:250 K,BSPPeer2:125 K。BSPPeer3:186 K),进行一次同步。各BSPPeer把接收到的数据存储于暂时变量tmp(其定义为:new HashMap<V, Vertex<V, E, M>>()。V用来存储邻接顶点的id,Vertex是以邻接顶点id为VertexID且Edges为空的顶点)中。
按此进行multiSteps次(8)。注意:与数据载入后Hash再分布时的(partitioningSteps-1)次不同。
进行multiSteps后,三个BSPPeer节点依旧剩余5 K、4 K 、3 K。再进行最后一次同步。各BSPPeer依旧后收到的数据载入到tmp变量中。
然后每一个BSPPeer扫描自己的vertices。把VertexID属于tmp的从tmp中删除。
最后把tmp中剩余的顶点相应的Vertex(以邻接顶点id为VertexID且Edges为空)增加到GraphJobRunner.Vertices中,至此数据修复完毕。