Spark GraphX图计算结构详解【图构建器、顶点、边】

一.图构建器

  GraphX提供了几种从RDD或磁盘上的顶点和边的集合构建图形的方法。默认情况下,没有图构建器会重新划分图的边;相反,边保留在默认分区中。Graph.groupEdges要求对图进行重新分区,因为它假定相同的边将在同一分区上放置,因此在调用Graph.partitionBy之前必须要调用groupEdges。 

 1 package org.apache.spark.graphx
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 5 import org.apache.spark.internal.Logging
 6 import org.apache.spark.storage.StorageLevel
 7 
 8 /**
 9  * Provides utilities for loading [[Graph]]s from files.
10  */
11 object GraphLoader extends Logging {
12 
13   /**
14    * Loads a graph from an edge list formatted file where each line contains two integers: a source
15    * id and a target id. Skips lines that begin with `#`.
16    */
17   def edgeListFile(
18       sc: SparkContext,
19       path: String,
20       canonicalOrientation: Boolean = false,
21       numEdgePartitions: Int = -1,
22       edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, //缓存级别,只保存到内存
23       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
24     : Graph[Int, Int] =
25   {
26     val startTime = System.currentTimeMillis
27 
28     // Parse the edge data table directly into edge partitions
29     val lines =
30       if (numEdgePartitions > 0) { // 加载文件数据
31         sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
32       } else {
33         sc.textFile(path)
34       } // 按照分区进行图构建
35     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
36       val builder = new EdgePartitionBuilder[Int, Int]
37       iter.foreach { line =>
38         if (!line.isEmpty && line(0) != '#') { // 过滤注释行
39           val lineArray = line.split("\\s+")
40           if (lineArray.length < 2) { // 识别异常数据
41             throw new IllegalArgumentException("Invalid line: " + line)
42           }
43           val srcId = lineArray(0).toLong
44           val dstId = lineArray(1).toLong
45           if (canonicalOrientation && srcId > dstId) {
46             builder.add(dstId, srcId, 1)// 逐个添加边及权重
47           } else {
48             builder.add(srcId, dstId, 1)
49           }
50         }
51       }
52       Iterator((pid, builder.toEdgePartition))
53     }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
54     edges.count() // 触发执行
55 
56     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
57 
58     GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
59       vertexStorageLevel = vertexStorageLevel)
60   } // end of edgeListFile
61 
62 }

  GraphLoader.edgeListFile是从磁盘或HDFS类似的文件系统中加载图形数据,解析为(源顶点ID, 目标顶点ID)对的邻接列表,并跳过注释行。Graph从指定的边开始创建,然后自动创建和边相邻的任何节点。所有顶点和边属性均默认为1。参数canonicalOrientation允许沿正方向重新定向边,这是所有连接算法所必须的。

 

 

 

上一篇:GraphX 学习笔记


下一篇:Spark Graphx常用函数