Spark GraphX
文章目录
第1节 Spark GraphX概述
GraphX 是 Spark 一个组件,专门用来表示图以及进行图的并行计算。GraphX 通过重新定义了图的抽象概念来拓展了 RDD: 定向多图,其属性附加到每个顶点和边。
为了支持图计算, GraphX 公开了一系列基本运算符(比如:mapVertices、mapEdges、subgraph)以及优化后的 Pregel API 变种。此外,还包含越来越多的图算法和构建器,以简化图形分析任务。
GraphX在图顶点信息和边信息存储上做了优化,使得图计算框架性能相对于原生RDD实现得以较大提升,接近或到达 GraphLab 等专业图计算平台的性能。GraphX最大的贡献是,在Spark之上提供一栈式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。
第2节 Spark Grap
架构
存储模式
核心数据结构
GraphX 与 Spark 其他组件相比相对独立,拥有自己的核心数据结构与算子。
2.1 GraphX 架构
GraphX的整体架构可以分为三个部分:
- 算法层。基于 Pregel 接口实现了常用的图算法。包括 PageRank、SVDPlusPlus、TriangleCount、 ConnectedComponents、StronglyConnectedConponents 等算法
- 接口层。在底层 RDD 的基础之上实现了 Pregel 模型 BSP 模式的计算接口
- 底层。图计算的核心类,包含:VertexRDD、EdgeRDD、RDD[EdgeTriplet]
2.2 存储模式
巨型图的存储总体上有边分割和点分割两种存储方式。2013年,GraphLab2.0将其存储方式由边分割变为点分割,在性能上取得重大提升,目前基本上被业界广泛接受并使用。
- 边分割(Edge-Cut):每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大
- 点分割(Vertex-Cut):每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量
虽然两种方法互有利弊,但现在是点分割占上风,各种分布式图计算框架都将自己底层的存储形式变成了点分割。主要原因有以下两个:
- 磁盘价格下降,存储空间不再是问题,而内网的通信资源没有突破性进展,集群计算时内网带宽是宝贵的,时间比磁盘更珍贵。这点就类似于常见的空间换时间的策略;
- 在当前的应用场景中,绝大多数网络都是“无尺度网络”,遵循幂律分布,不同点的邻居数量相差非常悬殊。而边分割会使那些多邻居的点所相连的边大多数被分到不同的机器上,这样的数据分布会使得内网带宽更加捉襟见肘,于是边分割存储方式被渐渐抛弃了;
2.3 核心数据结构
核心数据结构包括:graph、vertices、edges、triplets
GraphX API 的开发语言目前仅支持 Scala。GraphX 的核心数据结构 Graph 由 RDD封装而成。
1、Graph
GraphX 用属性图的方式表示图,顶点有属性,边有属性。存储结构采用边集数组的形式,即一个顶点表,一个边表,如下图所示:
顶点 ID 是非常重要的字段,它不光是顶点的唯一标识符,也是描述边的唯一手段。
顶点表与边表实际上就是 RDD,它们分别为 VertexRDD 与 EdgeRDD。在 Spark 的源码中,Graph 类如下:
- vertices 为顶点表,VD 为顶点属性类型
- edges 为边表,ED 为边属性类型
- 可以通过 Graph 的 vertices 与 edges 成员直接得到顶点 RDD 与边 RDD
- 顶点 RDD 类型为 VerticeRDD,继承自 RDD[(VertexId, VD)]
- 边 RDD 类型为 EdgeRDD,继承自 RDD[Edge[ED]]
2、vertices
vertices对应着名为 VertexRDD 的RDD。这个RDD由顶点id和顶点属性两个成员变量。
VertexRDD继承自 RDD[(VertexId, VD)],这里VertexId表示顶点id,VD表示顶点所带的属性的类别。
VertexId 实际上是一个Long类型的数据;
3、edges
edges对应着EdgeRDD。这个RDD拥有三个成员变量,分别是源顶点id、目标顶点id以及边属性。
Edge代表边,由 源顶点id、目标顶点id、以及边的属性构成。
4、triplets
triplets 表示边点三元组,如下图所示(其中圆柱形分别代表顶点属性与边属性):
通过 triplets 成员,用户可以直接获取到起点顶点、起点顶点属性、终点顶点、终点顶点属性、边与边属性信息。triplets 的生成可以由边表与顶点表通过 ScrId 与DstId 连接而成。
triplets对应着EdgeTriplet。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]。
第3节 Spark GraphX计算
图的定义
属性操作
转换操作
结构操作
关联操作
聚合操作
Pregel API
引入依赖:
<!-- graphx -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
案例一:图的基本操作
package cn.lagou.graphx
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphxExample1 {
case class User(name: String, age: Int, inDegress: Int, outDegress: Int)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
//定义顶点(Long, info)
val vertexArray: Array[(VertexId, (String, Int))] = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)
//定义边(Long, Long, attr)
val edgeArray: Array[Edge[Int]] = Array(Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3))
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)
// 图的定义,构造vertexRDD和edgeRDD
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
//属性操作(找出图中年龄>30的顶点,属性>5的边 ,属性>5 的triplets)
graph.vertices.filter { case (_, (_, age)) => age > 30 }.foreach(println)
/**
* (5,(Ed,55))
* (4,(David,42))
* (6,(Fran,50))
* (3,(Charlie,65))
*/
graph.edges.filter { edge => edge.attr > 5 }.foreach(println) //所有的边
/**
* Edge(5,3,8)
* Edge(2,1,7)
*/
graph.triplets.filter(t => t.attr > 5).foreach(println) //属性
/**
* ((2,(Bob,27)),(1,(Alice,28)),7)
* ((5,(Ed,55)),(3,(Charlie,65)),8)
*/
println("*****************属性操作*********************")
//属性操作 degrees操作 , 找出图中最大的出度、入度、度数
//入度最大值inDegress
val inDegress: (VertexId, Int) = graph.inDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"inDegress = $inDegress")
//出度最大值outDegrees
val outDegrees: (VertexId, Int) = graph.outDegrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"outDegrees = $outDegrees")
//度数最大
val degrees: (VertexId, Int) = graph.degrees
.reduce((x, y) => if (x._2 > y._2) x else y)
println(s"degrees = $degrees")
println("******************转换操作**********************")
// 转换操作 // 顶点的转换操作。所有人的年龄加 10 岁
graph.mapVertices{case (id, (name, age)) => (id, (name, age+10))}.vertices .foreach(println)
// 边的转换操作。边的属性*2
graph.mapEdges(e => e.attr*2) .edges .foreach(println)
// 结构操作
// 顶点年龄 > 30 的子图
val subGraph: Graph[(String, Int), Int] = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
println("************** 子图 ***************")
subGraph.edges.foreach(println) //所有的边
subGraph.vertices.foreach(println) //所有的顶点
println("************** 出度=入度 ***************")
// 找出出度=入度的人员。连接操作
// 思路:图 + 顶点的出度 + 顶点的入度 => 连接操作
val initailUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
val userGraph: Graph[User, Int] = initailUserGraph.outerJoinVertices(initailUserGraph.inDegrees) {
case (id, u, inDeg) => User(u.name, u.age, inDeg.getOrElse(0), u.outDegress)
}.outerJoinVertices(initailUserGraph.outDegrees) {
case (id, u, outDeg) => User(u.name, u.age, u.inDegress, outDeg.getOrElse(0))
}
userGraph.vertices.filter{case (_, user) => user.inDegress==user.outDegress}
.foreach(println)
println("************** 最短距离 ***************")
// 顶点5到其他各顶点的最短距离。聚合操作(Pregel API)
val sourceId: VertexId = 5L
val initailGraph: Graph[Double, Int] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val disGraph: Graph[Double, Int] = initailGraph.pregel(Double.PositiveInfinity)(
// 两个消息来的时候,取其中的最小路径
(id, dist, newDist) => math.min(dist, newDist),
// Send Message 函数
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else
Iterator.empty
},
// mergeMsg
(dista, distb) => math.min(dista, distb)
)
disGraph.vertices.foreach(println)
sc.stop()
}
}
Pregel API
图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。
所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。
一系列的图并发抽象被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作。
- vprog:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值
- sendMsg:发送消息
- mergeMsg:合并消息
案例二:连通图算法
给定数据文件,找到存在的连通体
package cn.lagou.graphx
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}
object GraphXExample2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
// 生成图
println("************** 生成图 ***************")
val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "data/graph.dat")
graph.vertices.foreach(println)
graph.edges.foreach(println)
// 调用连通图算法
println("************** 连通图算法 ***************")
graph.connectedComponents()
.vertices
.sortBy(_._2)
.foreach(println)
sc.stop()
}
}
案例三:寻找相同的用户,合并信息
假设:
- 假设有五个不同信息可以作为用户标识,分别为:1X、2X、3X、4X、5X;
- 每次可以选择使用若干为字段作为标识
- 部分标识可能发生变化,如:12 => 13 或 24 => 25
根据以上规则,判断以下标识是否代表同一用户:
- 11-21-32、12-22-33 (X)
- 11-21-32、11-21-52 (OK)
- 21-32、11-21-33 (OK)
- 11-21-32、32-48 (OK)
问题:在以下数据中,找到同一用户,合并相同用户的数据
- 对于用户标识(id):合并后去重
- 对于用户的信息:key相同,合并权重
List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0,"area$中关村" -> 1.0)
List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0,"area$回龙观" -> 1.0)
List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)
List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0,"area$西二旗" -> 1.0)
List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)
List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)
package cn.lagou.graphx
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphXExample3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("warn")
// 原始数据集
val lst: List[(List[Long], List[(String, Double)])] = List(
(List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)),
(List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)),
(List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)),
(List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
(List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
)
val rawRDD: RDD[(List[Long], List[(String, Double)])] = sc.makeRDD(lst)
// 创建边。RDD[Edge(Long, Long, T2)]
// List(11L, 21L, 31L), A1 => 11 -> 112131, 21 -> 112131, 31 -> 112131
val dotRDD: RDD[(Long, Long)] = rawRDD.flatMap { case (ids, _) =>
ids.map(id => (id, ids.mkString.toLong))
}
val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) }
// 创建顶点。RDD[(Long, T1)]
val vertexesRDD: RDD[(Long, String)] = dotRDD.map { case (id, ids) => (id, "") }
// 生成图
val graph: Graph[String, Int] = Graph(vertexesRDD, edgesRDD)
// 调用强连通体算法。识别6条数据,代表2个不同的用户
val connectedRDD: VertexRDD[VertexId] = graph.connectedComponents()
.vertices
// connectedRDD.foreach(println)
// 定义中心的数据
val centerVertexRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] =
rawRDD.map { case (ids, info) => (ids.mkString.toLong, (ids, info)) }
// centerVertexRDD.foreach(println)
// join操作,拿到分组的标记
val dataRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = connectedRDD.join(centerVertexRDD)
.map { case (_, (v1, v2)) => (v1, v2) }
// 数据聚合、合并
val resultRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] = dataRDD.reduceByKey { case ((bufIds, bufInfo), (ids, info)) =>
// 数据聚合
val newIds: List[VertexId] = bufIds ++ ids
val newInfo: List[(String, Double)] = bufInfo ++ info
// 对用户id做去重;对标签做合并
(newIds.distinct, newInfo.groupBy(_._1).mapValues(lst => lst.map(_._2).sum).toList)
}
resultRDD.foreach(println)
sc.stop()
}
}