一.图构建器
GraphX提供了几种从RDD或磁盘上的顶点和边的集合构建图形的方法。默认情况下,没有图构建器会重新划分图的边;相反,边保留在默认分区中。Graph.groupEdges要求对图进行重新分区,因为它假定相同的边将在同一分区上放置,因此在调用Graph.partitionBy之前必须要调用groupEdges。
1 package org.apache.spark.graphx 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} 5 import org.apache.spark.internal.Logging 6 import org.apache.spark.storage.StorageLevel 7 8 /** 9 * Provides utilities for loading [[Graph]]s from files. 10 */ 11 object GraphLoader extends Logging { 12 13 /** 14 * Loads a graph from an edge list formatted file where each line contains two integers: a source 15 * id and a target id. Skips lines that begin with `#`. 16 */ 17 def edgeListFile( 18 sc: SparkContext, 19 path: String, 20 canonicalOrientation: Boolean = false, 21 numEdgePartitions: Int = -1, 22 edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, //缓存级别,只保存到内存 23 vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) 24 : Graph[Int, Int] = 25 { 26 val startTime = System.currentTimeMillis 27 28 // Parse the edge data table directly into edge partitions 29 val lines = 30 if (numEdgePartitions > 0) { // 加载文件数据 31 sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions) 32 } else { 33 sc.textFile(path) 34 } // 按照分区进行图构建 35 val edges = lines.mapPartitionsWithIndex { (pid, iter) => 36 val builder = new EdgePartitionBuilder[Int, Int] 37 iter.foreach { line => 38 if (!line.isEmpty && line(0) != '#') { // 过滤注释行 39 val lineArray = line.split("\\s+") 40 if (lineArray.length < 2) { // 识别异常数据 41 throw new IllegalArgumentException("Invalid line: " + line) 42 } 43 val srcId = lineArray(0).toLong 44 val dstId = lineArray(1).toLong 45 if (canonicalOrientation && srcId > dstId) { 46 builder.add(dstId, srcId, 1)// 逐个添加边及权重 47 } else { 48 builder.add(srcId, dstId, 1) 49 } 50 } 51 } 52 Iterator((pid, builder.toEdgePartition)) 53 }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) 54 edges.count() // 触发执行 55 56 logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) 57 58 GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, 59 vertexStorageLevel = vertexStorageLevel) 60 } // end of edgeListFile 61 62 }
GraphLoader.edgeListFile是从磁盘或HDFS类似的文件系统中加载图形数据,解析为(源顶点ID, 目标顶点ID)对的邻接列表,并跳过注释行。Graph从指定的边开始创建,然后自动创建和边相邻的任何节点。所有顶点和边属性均默认为1。参数canonicalOrientation允许沿正方向重新定向边,这是所有连接算法所必须的。